


ln -s /usr/local/activemq/bin/activemq ./


chkconfig activemq on


service activemq start
service activemq status
service activemq stop



 <transportConnector name="openwire" uri="tcp://;wireFormat.maxFrameSize=104857600"/>




		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

Auto + Nio

<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>


OpenWire 可用配置选项

cacheEnabledtrueShould commonly repeated values be cached so that less marshaling occurs?
cacheSize1024When cacheEnabled=true then this parameter is used to specify the number of values to be cached.
maxInactivityDuration30000The maximum inactivityopen in new window duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to die. Therefore allow the broker to kill connections when they have been inactive for the configured period of time. Used by some transports to enable a keep alive heart beat feature. Inactivity monitoring is disabled when set to a value <= 0.
maxInactivityDurationInitalDelay10000The initial delay before starting inactivityopen in new window checks. Yes, the word 'Inital' is supposed to be misspelled like that.
maxFrameSizeMAX_LONGMaximum allowed frame size. Can help help prevent OOM DOS attacks.
sizePrefixDisabledfalseShould the size of the packet be prefixed before each packet is marshaled?
stackTraceEnabledtrueShould the stack trace of exception that occur on the broker be sent to the client?
tcpNoDelayEnabledtrueDoes not affect the wire format, but provides a hint to the peer that TCP_NODELAY should be enabled on the communications Socket.
tightEncodingEnabledtrueShould wire size be optimized over CPU usage?

Transport 可用配置选项

Option NameDefault ValueDescription
backlog5000Specifies the maximum number of connections waiting to be accepted by the transport server socket.
closeAsynctrueIf true the socket close call happens asynchronously. This parameter should be set to false for protocols like STOMP, that are commonly used in situations where a new connection is created for each read or write. Doing so ensures the socket close call happens synchronously. A synchronous close prevents the broker from running out of available sockets owing to the rapid cycling of connections.
connectionTimeout30000If >=1 the value sets the connection timeout in milliseconds. A value of 0 denotes no timeout. Negative values are ignored.
daemonfalseIf true the transport thread will run in daemon mode. Set this parameter to true when embedding the broker in a Spring container or a web container to allow the container to shut down correctly.
dynamicManagementfalseIf true the TransportLogger can be managed by JMX.
ioBufferSize8 * 1024Specifies the size of the buffer to be used between the TCP layer and the OpenWire layer where wireFormat based marshaling occurs.
jmxPort1099(Client Only) Specifies the port that will be used by the JMX server to manage the TransportLoggers. This should only be set, via URI, by either a client producer or consumer as the broker creates its own JMX server. Specifying an alternate JMX port is useful for developers that test a broker and client on the same machine and need to control both via JMX.
keepAlivefalseIf true, enables TCP KeepAliveopen in new window on the broker connection to prevent connections from timing out at the TCP level. This should not be confused with KeepAliveInfo messages as used by the InactivityMonitor.
logWriterNamedefaultSets the name of the org.apache.activemq.transport.LogWriter implementation to use. Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
maximumConnectionsInteger.MAX_VALUEThe maximum number of sockets allowed for this broker.
minmumWireFormatVersion0The minimum remote wireFormat version that will be accepted (note the misspelling). Note: when the remote wireFormat version is lower than the configured minimum acceptable version an exception will be thrown and the connection attempt will be refused. A value of 0 denotes no checking of the remote wireFormat version.
socketBufferSize64 * 1024Sets the size, in bytes, for the accepted socket’s read and write buffers.
soLingerInteger.MIN_VALUESets the socket’s option soLinger when the value is > -1. When set to -1 the soLinger socket option is disabled.
soTimeout0Sets the socket’s read timeout in milliseconds. A value of 0 denotes no timeout.
soWriteTimeout0Sets the socket’s write timeout in milliseconds. If the socket write operation does not complete before the specified timeout, the socket will be closed. A value of 0 denotes no timeout.
stackSize0Set the stack size of the transport’s background reading thread. Must be specified in multiples of 128K. A value of 0 indicates that this parameter is ignored.
startLoggingtrueIf true the TransportLogger object of the Transport stack will initially write messages to the log. This parameter is ignored unless trace=true.
tcpNoDelayfalseIf true the socket’s option TCP_NODELAY is set. This disables Nagle’s algorithm for small packet transmission.
threadNameN/AWhen this parameter is specified the name of the thread is modified during the invocation of a transport. The remote address is appended so that a call stuck in a transport method will have the destination information in the thread name. This is extremely useful when using thread dumps for degugging.
tracefalseCauses all commands that are sent over the transport to be logged. To view the logged output define the Log4j logger: log4j.logger.org.apache.activemq.transport.TransportLogger=DEBUG.
trafficClass0The Traffic Class to be set on the socket.
diffServ0(Client only) The preferred Differentiated Services traffic class to be set on outgoing packets, as described in RFC 2475. Valid integer values: [0,64]. Valid string values: EF, AF[1-3][1-4] or CS[0-7]. With JDK 6, only works when the JVM uses the IPv4 stack. To use the IPv4 stack set the system property java.net.preferIPv4Stack=true. Note: it’s invalid to specify both ‘diffServ and typeOfService’ at the same time as they share the same position in the TCP/IP packet headers
typeOfService0(Client only) The preferred Type of Service value to be set on outgoing packets. Valid integer values: [0,256]. With JDK 6, only works when the JVM is configured to use the IPv4 stack. To use the IPv4 stack set the system property java.net.preferIPv4Stack=true. Note: it’s invalid to specify both ‘diffServ and typeOfService’ at the same time as they share the same position in the TCP/IP packet headers.
useInactivityMonitortrueWhen false the InactivityMonitor is disabled and connections will never time out.
useKeepAlivetrueWhen true KeepAliveInfo messages are sent on an idle connection to prevent it from timing out. If this parameter is false connections will still timeout if no data was received on the connection for the specified amount of time.
useLocalHostfalseWhen true local connections will be made using the value localhost instead of the actual local host name. On some operating systems, such as OS X, it’s not possible to connect as the local host name so localhost is better.
useQueueForAccepttrueWhen true accepted sockets are placed onto a queue for asynchronous processing using a separate thread.
wireFormatdefaultThe name of the wireFormat factory to use.
wireFormat.*N/AProperties with this prefix are used to configure the wireFormat.

ActiveMQ服务监控 Hawtio





java -jar



  • 下载war包
  • 复制到webapps下

jetty.xml bean标签下添加

				<bean class="org.eclipse.jetty.webapp.WebAppContext">        
					<property name="contextPath" value="/hawtio" />        
					<property name="war" value="${activemq.home}/webapps/hawtio.war" />        
					<property name="logUrlOnStart" value="true" />  


if "%ACTIVEMQ_OPTS%" == "" set ACTIVEMQ_OPTS=-Xms1G -Xmx1G -Dhawtio.realm=activemq -Dhawtio.role=admins -Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config="%ACTIVEMQ_CONF%\login.config" 


Message主要由三部分组成,分别是Header,Properties,Body, 详细如下:



public interface Message {
    public Destination getJMSDestination() throws JMSException;
    public void setJMSDestination(Destination destination) throws JMSException;
    public int getJMSDeliveryMode() throws JMSException
    public void setJMSDeliveryMode(int deliveryMode) throws JMSException;
    public String getJMSMessageID() throws JMSException;
    public void setJMSMessageID(String id) throws JMSException;
    public long getJMSTimestamp() throws JMSException'
    public void setJMSTimestamp(long timestamp) throws JMSException;
    public long getJMSExpiration() throws JMSException;
    public void setJMSExpiration(long expiration) throws JMSException;
    public boolean getJMSRedelivered() throws JMSException;
    public void setJMSRedelivered(boolean redelivered) throws JMSException;
    public int getJMSPriority() throws JMSException;
    public void setJMSPriority(int priority) throws JMSException;
    public Destination getJMSReplyTo() throws JMSException;
    public void setJMSReplyTo(Destination replyTo) throws JMSException;
    public String getJMScorrelationID() throws JMSException;
    public void setJMSCorrelationID(String correlationID) throws JMSException;
    public byte[] getJMSCorrelationIDAsBytes() throws JMSException;
    public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;
    public String getJMSType() throws JMSException;
    public void setJMSType(String type) throws JMSException;




JMSDeliveryMode消息的发送模式,分为NON_PERSISTENTPERSISTENT,即非持久性模式的和持久性模式。默认设置为PERSISTENT(持久性)。一条持久性消息应该被传送一次(就一次),这就意味着如果JMS提供者出现故障,该消息并不会丢失; 它会在服务器恢复正常之后再次传送。一条非持久性消息最多只会传送一次,这意味着如果JMS提供者出现故障,该消息可能会永久丢失。在持久性和非持久性这两种传送模式中,消息服务器都不会将一条消息向同一消息者发送一次以上(成功算一次)。send
JMSRedelivered消息是否重复发送过,如果该消息之前发送过,那么这个属性的值需要被设置为true, 客户端可以根据这个属性的值来确认这个消息是否重复发送过,以避免重复处理。Provider


MessageProducer producer = session.createProducer(topic);


		//将过期时间设置为1小时(1000毫秒 *60 *60)
		producer.setTimeToLive(1000 * 60 * 60);




JMSReplyTo消息回复的目的地,其值为一个Topic或Queue, 这个由发送者设置,但是接收者可以决定是否响应client



下一代 ActiveMQ 6?Artemis


  • 包含JNDI,具有完整的JMS 1.1 & 2.0客户端实现
  • 高可用性共享存储、网络复制能力
  • 简单而强大的寻址模型协议
  • 灵活的负载均衡分配能力
  • 针对低延迟持久性和JDBC的高级日志实现
  • 与ActiveMQ 5的高功能奇偶校验,以简化迁移


  • netty
  • 自己的存储
  • 优化传输流程
  • 更高的性能
  • 不再把所有的协议转换成openwire






ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。

ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。













map *ConcurrentHashMap* -> putIfAbsent guava cache


queue 优先级别设置

多消费端 ->