带有并发使用者的Springboot RabbitMQ - java

我有一个简单的体系结构,其中三个进程之一(实际上是一个弹簧引导的dockerized容器)将数千条消息生成到交换/队列,并且所有三个进程都使用这些消息。

我希望使用方多线程以实现最大吞吐量,但是使用我当前的配置,我无法实现我想要的功能。

这是设置:

听众:

  @RabbitListener(
    bindings = @QueueBinding(
      value = @Queue(
                value = RabbitMQConfiguration.BORD_QUEUE, 
                durable = "true", 
                arguments = @Argument(name = "x-queue-mode", value = "lazy")),
      exchange = @Exchange(value = RabbitMQConfiguration.QUEUE_EXCHANGE),
      key = "B"), 
    concurrency = "8-8")

  public void listenBordero(Long bId, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.info("Received message: {{}}", bId);
    processor.process(bId);
  }

这是-可能相关的-springboot application.properties配置:

spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.simple.max-concurrency=8
spring.rabbitmq.listener.simple.concurrency=8

尽管我认为由于基于@RabbitListener批注的配置,并非所有它们都是必需的。

在rabbitmq管理中,我看到了
带有并发使用者的Springboot RabbitMQ - java
具有状态主要是空闲。

从我的springboot-logs中,我几乎看不到线程发生的情况(日志中的线程名称仅在行行doezens之后才更改,而我希望切换的频率更高)。

队列显示在管理控制台中:

带有并发使用者的Springboot RabbitMQ - java

我的期望是在频道页面上看到消费者的统计信息更多地处于“运行”状态。

谁能启发我?

提前致谢!

编辑:
在激活spring-amqp的调试后,我看到接收到多条消息,但是之后,仅对整个名为“ ..Container#0-n”的线程进行处理。我本来希望多个正在运行的容器线程的输出混合在一起。这可能是ThreadPool问题吗?

这是一个(降噪后的)日志摘录:

2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-7] d.i.clearing.rabbit.RabbitMQListener     : Received message: {436}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-5] d.i.clearing.rabbit.RabbitMQListener     : Received message: {450}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-12] d.i.clearing.rabbit.RabbitMQListener     : Received message: {414}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-11] d.i.clearing.rabbit.RabbitMQListener     : Received message: {418}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-8] d.i.clearing.rabbit.RabbitMQListener     : Received message: {432}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-2] d.i.clearing.rabbit.RabbitMQListener     : Received message: {456}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-15] d.i.clearing.rabbit.RabbitMQListener     : Received message: {469}
2020-03-05 08:15:21.419  INFO 32960 --- [tContainer#0-16] d.i.clearing.rabbit.RabbitMQListener     : Received message: {394}
2020-03-05 08:15:21.418  INFO 32960 --- [tContainer#0-13] d.i.clearing.rabbit.RabbitMQListener     : Received message: {409}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-4] d.i.clearing.rabbit.RabbitMQListener     : Received message: {452}
2020-03-05 08:15:21.418  INFO 32960 --- [ntContainer#0-9] d.i.clearing.rabbit.RabbitMQListener     : Received message: {431}
2020-03-05 08:15:21.419  INFO 32960 --- [tContainer#0-10] d.i.clearing.rabbit.RabbitMQListener     : Received message: {420}
2020-03-05 08:15:21.419  INFO 32960 --- [tContainer#0-14] d.i.clearing.rabbit.RabbitMQListener     : Received message: {350}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-1] d.i.clearing.rabbit.RabbitMQListener     : Received message: {400}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-3] d.i.clearing.rabbit.RabbitMQListener     : Received message: {453}
2020-03-05 08:15:21.419  INFO 32960 --- [ntContainer#0-6] d.i.clearing.rabbit.RabbitMQListener     : Received message: {449}
... Logging Noise removed
2020-03-05 08:15:26.409  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : Calculating {7183042} - {8330}/{5500}  
2020-03-05 08:15:26.411  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.412 DEBUG 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.413  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.507  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Clearing  : ... Logging Noise removed
2020-03-05 08:15:26.669  WARN 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Rddel     : ... Logging Noise removed
2020-03-05 08:15:26.669  INFO 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Rddel     : ... Logging Noise removed
2020-03-05 08:15:33.012 DEBUG 32960 --- [ntContainer#0-3] de.idslogistik.clearing.api.p2.Rddel     : ... Logging Noise removed
2020-03-05 08:15:33.013 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed
2020-03-05 08:15:33.013 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed
2020-03-05 08:15:33.013 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed
2020-03-05 08:15:33.014 DEBUG 32960 --- [ntContainer#0-3] d.i.i.util.clearing.VerSendungTools      : ... Logging Noise removed

参考方案

prefetch = 1表示当前队列中未确认消息的最大数量为1。如果不确认此消息,则下一条消息无法进入消耗队列。
或者,您可以考虑增加使用者的数量以加快邮件处理速度
https://www.rabbitmq.com/tutorials/tutorial-two-java.html

java:继承 - java

有哪些替代继承的方法? java大神给出的解决方案 有效的Java:偏重于继承而不是继承。 (这实际上也来自“四人帮”)。他提出的理由是,如果扩展类未明确设计为继承,则继承会引起很多不正常的副作用。例如,对super.someMethod()的任何调用都可以引导您通过未知代码的意外路径。取而代之的是,持有对本来应该扩展的类的引用,然后委托给它。这是与Eric…

Java:BigInteger,如何通过OutputStream编写它 - java

我想将BigInteger写入文件。做这个的最好方式是什么。当然,我想从输入流中读取(使用程序,而不是人工)。我必须使用ObjectOutputStream还是有更好的方法?目的是使用尽可能少的字节。谢谢马丁 参考方案 Java序列化(ObjectOutputStream / ObjectInputStream)是将对象序列化为八位字节序列的一种通用方法。但…

Java-如何将此字符串转换为日期? - java

我从服务器收到此消息,我不明白T和Z的含义,2012-08-24T09:59:59Z将此字符串转换为Date对象的正确SimpleDateFormat模式是什么? java大神给出的解决方案 这是ISO 8601标准。您可以使用SimpleDateFormat simpleFormat = new SimpleDateFormat("yyyy-MM…

Java:从类中查找项目名称 - java

仅通过类的实例,如何使用Java反射或类似方法查找项目名称?如果不是,项目名称(我真正想要的是)可以找到程序包名称吗? 参考方案 项目只是IDE使用的简单组织工具,因此项目名称不是类或JVM中包含的信息。要获取软件包,请使用Class#getPackage()。然后,可以调用Package#getName()将包作为您在代码的包声明中看到的String来获取…

JAVA 8具有任何匹配属性的对象的过滤器列表 - java

我的要求是通过匹配任何属性的字符串来过滤对象列表。例如,假设Contact类具有三个属性:街道,城市,电话。我知道java流过滤器是如何工作的,在这里我必须将输入字符串与每个属性进行比较,如下所示:contactList.stream().filter(contact -> contact.getStreet().equals("dubai&…