point to point

producter

 
     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
<!-- pool: enabled :true 时要加入这个 -->
        <dependency>
            <groupId>org.messaginghub</groupId>
            <artifactId>pooled-jms</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies> 

yml

spring:
  activemq:
    #账号密码
    user: user
    password: user
    #URL of the ActiveMQ broker.
    broker-url: tcp://127.0.0.1:61616
    in-memory: false
    #必须使用连接池
    pool:
      #启用连接池
      enabled: true
      #连接池最大连接数
      max-connections: 5
      #空闲的连接过期时间,默认为30秒
      idle-timeout: 30s


producter提供一个产生消息的接口,调用一次产生一个消息【一个字符串作为消息。】

启动类增加注解

  • @EnableJms
@EnableJms
@MapperScan(basePackages = { "com.example.demo.mapper" })
@SpringBootApplication
public class DemoApplication {

	public static void main(String[] args) {

		SpringApplication.run(DemoApplication.class, args);
	}
}


import javax.jms.Destination;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController(value = "jms")
public class Producter {

	 private final JmsMessagingTemplate template;

	    @Autowired
	    public Producter(JmsMessagingTemplate template) {
	        this.template = template;
	    }

	    @RequestMapping("/queue")
	    public String queue(){
	        // 构建一个消息, 名称是 queue01
	        Destination destination = new ActiveMQQueue("queue01");
	        String message = "我是消息内容, " + System.currentTimeMillis();
	        template.convertAndSend(destination, message);
	        return "success";
	    } 
}

/**
定义 队列
 */
@Configuration
public class BeanConfig{

    @Bean
    Queue queue() {
        return new ActiveMQQueue("sync_org");
    }
}
  • 查看activemq的管理界面,Queues界面多了一条记录,名称是queue01,因为没有启动customer,没有消费者,所以这个消息会一直保存着。

customer

customer中不需要那么麻烦,创建一个class,然后加上注解,@JmsListener,然后指定监听的任务名称destination

@EnableJms
@Component
public class Customer {
    @JmsListener(destination = "queue01")
    public void customer(String msg) {
        System.out.println("接收到的消息:");
        System.err.println(msg);
    }
}

查看activemq状态,已经被接收了。

延时消息

要使用延迟消息【比如2小时后才送达消息,默认是即时】,必须修改的配置,找到activemq.xml文件,添加schedulerSupport=true这个属性

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" schedulerSupport="true" dataDirectory="${activemq.data}">

延迟消息,构建一个handler

@Slf4j
@Component
public class ActiveMQHandler {

    private final JmsMessagingTemplate template;

    @Autowired
    public ActiveMQHandler(JmsMessagingTemplate template) {
        this.template = template;
    }

    /**
     * 发送即时消息
     * @param destination
     * @param data
     */
    public void send(String queueName, String data) {
        log.info(">>>>>>>立即发送:" + data);
        template.convertAndSend(new ActiveMQQueue(queueName), data);
    }

    /**
     * 延时发送的信息
     * @param name 监听的名称
     * @param data 发送的数据
     * @param time 延时多少时间处理消息.
     */
    public void delaySend(String name, String data, long time) {
        log.info("====>>> 延时任务:" + name + ",data:" + data);
        //获取连接工厂
        ConnectionFactory connectionFactory = template.getConnectionFactory();
        try {
            //获取连接
            assert connectionFactory != null;
            Connection connection = connectionFactory.createConnection();
            connection.start();
            //获取session, true开启事务,false关闭事务
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(name);
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage message = session.createTextMessage(data);
            //设置延迟时间 //AMQ_SCHEDULED_DELAY
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time * 1000L);
            //发送
            producer.send(message);
            session.commit();
            producer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

延时发送


import java.time.LocalDateTime;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.example.demo.config.ActiveMQHandler;
@RestController
public class Producter {

    private final ActiveMQHandler handler;

    @Autowired
    public Producter(ActiveMQHandler handler) {
        this.handler = handler;
    }

    /**
     * 即时消息
     * @return
     */
    @RequestMapping("/queue")
    public String queue(){
        // 构建一个消息, 名称是 queue01
        String message = "我是消息内容, " + System.currentTimeMillis();
        handler.send("queue01", message);
        return "success";
    }

    /**
     * 延迟消息
     * @return
     */
    @RequestMapping("/delaySend")
    public String delaySend(){
         for (int i = 0; i < 10; i++) {
        
            String message = "我是延迟消息内容, " + i+LocalDateTime.now();
            handler.delaySend("delaySend01",message,20);
        }
        return "success";
    }
}

延时接收


import java.time.LocalDateTime;

import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;


@Slf4j
@EnableJms
@Component
public class Customer {
    @JmsListener(destination = "queue01")
    public void customer(String msg) {
         System.err.println(msg+"queue01"+LocalDateTime.now()+"------------queue01");
    }

    @JmsListener(destination = "delaySend01")
    public void customer2(String msg) {
        log.info("接收延时消息:" + msg+"local"+LocalDateTime.now());
    }
}