• 广播

producter

producter handler增加topic,其实就是将原来的ActiveMQQueue换成了ActiveMQTopic


    /** ActiveMQHandler
     * 发送订阅消息
     * @param topicName
     * @param data
     */
    public void topic(String topicName,String data){
        log.info("---->发送订阅消息" + topicName + ",data:" + data);
        template.convertAndSend(new ActiveMQTopic("topic01"), data);
    }


// controller 调用

    @RequestMapping("/topic")
    public String topic(){
        handler.topic("topic01","hello world");
        return "success";
    }

 spring: 
  jms:
    # 如果要使用topic,开启配置
    pub-sub-domain: true

customer

设定了系统中的消息类别为topic:

pub-sub-domain: true

  • 为了同时支持topic和queue, 需要在消费者customer端配置指定连接工厂。

@Configuration
public class ListenerContainerConfig {
 
 	// topic模式的ListenerContainer
	@Bean
	public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
		DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
		bean.setPubSubDomain(true);
		bean.setConnectionFactory(activeMQConnectionFactory);
		return bean;
	}

	// queue模式的ListenerContainer
	@Bean
	public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
		DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
		bean.setConnectionFactory(activeMQConnectionFactory);
		return bean;
	}
}

监听不同队列


@Slf4j
@EnableJms
@Component
public class Customer {
	
 
    
    @JmsListener(destination = "queue01",containerFactory = "jmsListenerContainerQueue")
    public void customer(String msg) {
         System.out.println(msg+"queue01"+LocalDateTime.now()+"------------queue01");
    }

    @JmsListener(destination = "delaySend01",containerFactory = "jmsListenerContainerQueue")
    public void customer2(String msg) {
    	 System.out.println("接收延时消息:" + msg+"local"+LocalDateTime.now());
    }
    
    @JmsListener(destination = "topic01",containerFactory = "jmsListenerContainerTopic")
    public void customer3(String msg) {
    	 System.out.println("收到订阅消息----topic:" + msg);
    }

}

spring:
  activemq:
    #账号密码
    user: user
    password: user
    #URL of the ActiveMQ broker.
    broker-url: tcp://127.0.0.1:61616
    in-memory: false
    #必须使用连接池
    pool:
      #启用连接池
      enabled: true
      #连接池最大连接数
      max-connections: 5
      #空闲的连接过期时间,默认为30秒
      idle-timeout: 30s