Spring Boot嵌入式HornetQ集群不转发消息 - java

我正在尝试使用嵌入式HornetQ服务器创建两个Spring Boot应用程序的静态集群。一个应用程序/服务器将处理外部事件并生成要发送到消息队列的消息。另一个应用程序/服务器将在消息队列上侦听并处理传入的消息。由于两个应用程序之间的链接不可靠,因此每个应用程序将仅使用本地/ inVM客户端在其各自的服务器上生成/使用消息,并依靠群集功能将消息转发到群集中其他服务器上的队列。

我正在使用HornetQConfigurationCustomizer自定义嵌入式HornetQ服务器,因为默认情况下,它仅随InVMConnectorFactory一起提供。

我创建了两个示例应用程序来说明此设置,在整个示例中,“ ServerSend”是指将生成消息的服务器,而“ ServerReceive”是将使用消息的服务器。

这两个应用程序的pom.xml都包含:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>
<dependency>
    <groupId>org.hornetq</groupId>
    <artifactId>hornetq-jms-server</artifactId>
</dependency>

DemoHornetqServerSendApplication:

@SpringBootApplication
@EnableScheduling
public class DemoHornetqServerSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Server: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverReceiveConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverSendConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}

application.properties(ServerSend):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

DemoHornetqServerReceiveApplication:

@SpringBootApplication
@EnableJms
public class DemoHornetqServerReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${spring.hornetq.embedded.queues}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqServerReceiveApplication.class, args);
    }

    @JmsListener(destination="${spring.hornetq.embedded.queues}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }

    @Bean
    public HornetQConfigurationCustomizer hornetCustomizer() {
        return new HornetQConfigurationCustomizer() {

            @Override
            public void customize(Configuration configuration) {
                String serverSendConnectorName = "server-send-connector";
                String serverReceiveConnectorName = "server-receive-connector";

                Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations();

                Map<String, Object> params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5446");
                TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverReceiveConnectorName, tc);

                Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
                tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
                acceptors.add(tc);

                params = new HashMap<String, Object>();
                params.put(TransportConstants.HOST_PROP_NAME, "localhost");
                params.put(TransportConstants.PORT_PROP_NAME, "5445");
                tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
                connectorConf.put(serverSendConnectorName, tc);

                List<String> staticConnectors = new ArrayList<String>();
                staticConnectors.add(serverSendConnectorName);
                ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration(
                        "my-cluster", // name
                        "jms", // address
                        serverReceiveConnectorName, // connector name
                        500, // retry interval
                        true, // duplicate detection
                        true, // forward when no consumers
                        1, // max hops
                        1000000, // confirmation window size
                        staticConnectors, 
                        true // allow direct connections only
                        );
                configuration.getClusterConfigurations().add(conf);

                AddressSettings setting = new AddressSettings();
                setting.setRedistributionDelay(0);
                configuration.getAddressesSettings().put("#", setting);
            }
        };
    }
}

application.properties(ServerReceive):

spring.hornetq.mode=embedded
spring.hornetq.embedded.enabled=true
spring.hornetq.embedded.queues=jms.testqueue
spring.hornetq.embedded.cluster-password=password

启动两个应用程序后,日志输出显示以下内容:

服务器发送:

2015-04-09 11:11:58.471  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)  
2015-04-09 11:11:58.501  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221045: libaio is not available, switching the configuration into NIO  
2015-04-09 11:11:58.595  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221043: Adding protocol support CORE  
2015-04-09 11:11:58.720  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221003: trying to deploy queue jms.queue.jms.testqueue  
2015-04-09 11:11:59.568  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5445  
2015-04-09 11:11:59.593  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221007: Server is now live  
2015-04-09 11:11:59.593  INFO 7536 --- [           main] org.hornetq.core.server                  : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124)   [c139929d-d90f-11e4-ba2e-e58abf5d6944] 

服务器接收:

2015-04-09 11:12:04.401  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging)  
2015-04-09 11:12:04.410  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221045: libaio is not available, switching the configuration into NIO  
2015-04-09 11:12:04.520  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221043: Adding protocol support CORE  
2015-04-09 11:12:04.629  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221003: trying to deploy queue jms.queue.jms.testqueue  
2015-04-09 11:12:05.545  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5446  
2015-04-09 11:12:05.578  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221007: Server is now live  
2015-04-09 11:12:05.578  INFO 4528 --- [           main] org.hornetq.core.server                  : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124)   [c139929d-d90f-11e4-ba2e-e58abf5d6944] 

我在两个输出中都看到了clustered=true,如果我从false中删除​​了集群配置,它将显示HornetQConfigurationCustomizer,因此它必须具有一定的作用。

现在,ServerSend在控制台输出中显示了这一点:

Sending message: Timestamp from Server: 1428574324910  
Sending message: Timestamp from Server: 1428574329899  
Sending message: Timestamp from Server: 1428574334904  

但是,ServerReceive不显示任何内容。

似乎消息没有从ServerSend转发到ServerReceive。

我通过创建另外两个Spring Boot应用程序(ClientSend和ClientReceive)进行了更多测试,这些应用程序没有嵌入HornetQ服务器,而是连接到“本机”服务器。

两个客户端应用程序的pom.xml都包含:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-hornetq</artifactId>
</dependency>

DemoHornetqClientSendApplication:

@SpringBootApplication
@EnableScheduling
public class DemoHornetqClientSendApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientSendApplication.class, args);
    }

    @Scheduled(fixedRate = 5000)
    private void sendMessage() {
        String message = "Timestamp from Client: " + System.currentTimeMillis();
        System.out.println("Sending message: " + message);
        jmsTemplate.convertAndSend(testQueue, message);
    }
}

application.properties(ClientSend):

spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5446

queue=jms.testqueue

DemoHornetqClientReceiveApplication:

@SpringBootApplication
@EnableJms
public class DemoHornetqClientReceiveApplication {
    @Autowired
    private JmsTemplate jmsTemplate;
    private @Value("${queue}") String testQueue;

    public static void main(String[] args) {
        SpringApplication.run(DemoHornetqClientReceiveApplication.class, args);
    }

    @JmsListener(destination="${queue}")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

application.properties(ClientReceive):

spring.hornetq.mode=native
spring.hornetq.host=localhost
spring.hornetq.port=5445

queue=jms.testqueue

现在控制台显示以下内容:

ServerReveive:

Received message: Timestamp from Client: 1428574966630  
Received message: Timestamp from Client: 1428574971600  
Received message: Timestamp from Client: 1428574976595  

客户接收:

Received message: Timestamp from Server: 1428574969436  
Received message: Timestamp from Server: 1428574974438  
Received message: Timestamp from Server: 1428574979446  

如果ServerSend运行了一段时间,然后启动ClientReceive,它还会接收到该点排队的所有消息,因此这表明这些消息不仅会消失在某个地方,或者会从其他地方消耗掉。

为了完整起见,我也将ClientSend指向ServerSend,将ClientReceive指向ServerReceive,以查看集群和InVM客户端是否存在问题,但同样没有溢出表明收到了任何消息在ClientReceiveServerReceive中。

这样看来,往/自每个嵌入式代理到直接连接的外部客户端的消息传递正常,但是在群集中的代理之间没有消息转发。

因此,毕竟,这是一个大问题,在群集内不转发消息的设置有什么问题?

参考方案

http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595

“ HornetQ核心被设计为一组简单的POJO,因此,如果您的应用程序内部需要消息传递功能,但又不想将其作为HornetQ服务器公开,则可以直接实例化HornetQ服务器并将其嵌入您自己的应用程序中。”

如果要嵌入它,则不会将其作为服务器公开。每个容器都有一个单独的实例。这等效于启动大黄蜂的两个副本并为其赋予相同的队列名称。一个在第一个实例上写入该队列,另一个在第二个实例上侦听该队列。

如果要以这种方式解耦应用程序,则需要在单个位置充当服务器。可能是您想要集群。这不是BTW的Hornet特有的。您会经常发现这种模式。

Spring MVC拦截器映射问题 - java

我有这段XML:<mvc:interceptors> <mvc:interceptor> <mvc:mapping path="/statics/**" /> <bean class="com.company.website.servlet.StaticsHandlerIntercept…

Java:“自动装配”继承与依赖注入 - java

Improve this question 我通常以常见的简单形式使用Spring框架: 控制器服务存储库通常,我会在CommonService类中放一个通用服务,并使所有其他服务扩展到类中。一个开发人员告诉我,最好在每个服务中插入CommonClass而不是使用继承。我的问题是,有一个方法比另一个更好吗? JVM或性能是否会受到另一个影响?更新资料Comm…

Spring Boot如何在POST之后返回响应 - java

我想创建一个新客户并在创建客户后返回客户编号。客户编号必须是从50000开始的自动递增的唯一编号。到目前为止,我已经成功创建了一个客户,但是我不确定应该如何生成客户编号,将其保存到数据库中,并在触发POST时将其作为成功消息显示给用户。json下面是所需的响应;{ "customerNumber": "50002", …

Spring中的应用程序上下文有什么作用? - java

我昨天问了一个问题(Using Spring in standalone apps),有关如何在独立应用程序中使用Spring。由此得知,您只创建一次应用程序上下文对象。因此,现在的问题是(即使在评论中得到了部分回答)创建应用程序上下文时会发生什么?当您说时,Spring是否会创建这些豆子并将它们连接在一起new ClassPathXmlApplicatio…

注解中的Spring属性值 - java

如何获取注释内的属性值。例如我有一个注释@GetMyValue(value1="Val1",intVal=10) 现在,我希望“ Val1”和10来自属性文件。我试过了@GetMyValue(value1="${test.value}",intVal="${test.int.value}") 这不起…