point to point
producter
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<!-- pool: enabled :true 时要加入这个 -->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
yml
spring:
activemq:
#账号密码
user: user
password: user
#URL of the ActiveMQ broker.
broker-url: tcp://127.0.0.1:61616
in-memory: false
#必须使用连接池
pool:
#启用连接池
enabled: true
#连接池最大连接数
max-connections: 5
#空闲的连接过期时间,默认为30秒
idle-timeout: 30s
producter提供一个产生消息的接口,调用一次产生一个消息【一个字符串作为消息。】
启动类增加注解
- @EnableJms
@EnableJms
@MapperScan(basePackages = { "com.example.demo.mapper" })
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
发
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController(value = "jms")
public class Producter {
private final JmsMessagingTemplate template;
@Autowired
public Producter(JmsMessagingTemplate template) {
this.template = template;
}
@RequestMapping("/queue")
public String queue(){
// 构建一个消息, 名称是 queue01
Destination destination = new ActiveMQQueue("queue01");
String message = "我是消息内容, " + System.currentTimeMillis();
template.convertAndSend(destination, message);
return "success";
}
}
/**
定义 队列
*/
@Configuration
public class BeanConfig{
@Bean
Queue queue() {
return new ActiveMQQueue("sync_org");
}
}
- 查看activemq的管理界面,Queues界面多了一条记录,名称是queue01,因为没有启动customer,没有消费者,所以这个消息会一直保存着。
customer
customer中不需要那么麻烦,创建一个class,然后加上注解,@JmsListener,然后指定监听的任务名称destination
收
@EnableJms
@Component
public class Customer {
@JmsListener(destination = "queue01")
public void customer(String msg) {
System.out.println("接收到的消息:");
System.err.println(msg);
}
}
查看activemq状态,已经被接收了。
延时消息
要使用延迟消息【比如2小时后才送达消息,默认是即时】,必须修改的配置,找到activemq.xml文件,添加schedulerSupport=true这个属性
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" schedulerSupport="true" dataDirectory="${activemq.data}">
延迟消息,构建一个handler
@Slf4j
@Component
public class ActiveMQHandler {
private final JmsMessagingTemplate template;
@Autowired
public ActiveMQHandler(JmsMessagingTemplate template) {
this.template = template;
}
/**
* 发送即时消息
* @param destination
* @param data
*/
public void send(String queueName, String data) {
log.info(">>>>>>>立即发送:" + data);
template.convertAndSend(new ActiveMQQueue(queueName), data);
}
/**
* 延时发送的信息
* @param name 监听的名称
* @param data 发送的数据
* @param time 延时多少时间处理消息.
*/
public void delaySend(String name, String data, long time) {
log.info("====>>> 延时任务:" + name + ",data:" + data);
//获取连接工厂
ConnectionFactory connectionFactory = template.getConnectionFactory();
try {
//获取连接
assert connectionFactory != null;
Connection connection = connectionFactory.createConnection();
connection.start();
//获取session, true开启事务,false关闭事务
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(name);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(data);
//设置延迟时间 //AMQ_SCHEDULED_DELAY
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time * 1000L);
//发送
producer.send(message);
session.commit();
producer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
延时发送
import java.time.LocalDateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.demo.config.ActiveMQHandler;
@RestController
public class Producter {
private final ActiveMQHandler handler;
@Autowired
public Producter(ActiveMQHandler handler) {
this.handler = handler;
}
/**
* 即时消息
* @return
*/
@RequestMapping("/queue")
public String queue(){
// 构建一个消息, 名称是 queue01
String message = "我是消息内容, " + System.currentTimeMillis();
handler.send("queue01", message);
return "success";
}
/**
* 延迟消息
* @return
*/
@RequestMapping("/delaySend")
public String delaySend(){
for (int i = 0; i < 10; i++) {
String message = "我是延迟消息内容, " + i+LocalDateTime.now();
handler.delaySend("delaySend01",message,20);
}
return "success";
}
}
延时接收
import java.time.LocalDateTime;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@EnableJms
@Component
public class Customer {
@JmsListener(destination = "queue01")
public void customer(String msg) {
System.err.println(msg+"queue01"+LocalDateTime.now()+"------------queue01");
}
@JmsListener(destination = "delaySend01")
public void customer2(String msg) {
log.info("接收延时消息:" + msg+"local"+LocalDateTime.now());
}
}