ActiveMQ使用经验分享,配置详解
根据我们的使用场景抽取出来了一系列activemq公共配置参数mq.properties
mq.properties
activemq.connnect.brokerurl=failover:(tcp://192.168.0.66:61616)
activemq.connnect.useAsyncSend=true
# object对象接受报名单,true不受限制,false需要设置白名单
activemq.connnect.trustAllPackages=true
# 最大连接数
activemq.pool.maxConnections=20
# 空闲失效时间,毫秒
activemq.pool.idleTimeout=60000
# 初始数量
activemq.listener.pool.corePoolSize=5
activemq.listener.pool.maxPoolSize=10
# 启动守护进程
activemq.listener.pool.daemon=true
# 单位秒
activemq.listener.pool.keepAliveSeconds=120
# 由于jms:listener-container不支持propertyPlaceholder替换,因此这些参数值写在spring-mq.xml文件中,参考值
#
# 接收消息时的超时时间,单位毫秒
activemq.consumer.receiveTimeout=60000
# 监听目标类型
activemq.listener.destinationtype=queue
# 监听确认消息方式
activemq.listener.acknowledge=auto
# 监听数量
activemq.listener.concurrency=2-10
spring-mq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 配置activeMQ连接 tcp://192.168.0.66:61616 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.connnect.brokerurl}" />
<!-- useAsyncSend 异步发送 -->
<property name="useAsyncSend" value="${activemq.connnect.useAsyncSend}"></property>
<!-- 关闭对象传输有白名单限制 -->
<property name="trustAllPackages" value="${activemq.connnect.trustAllPackages}"></property>
</bean>
<!-- 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化
这样可以大大减少我们的资源消耗, -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="${activemq.pool.maxConnections}" />
<property name="idleTimeout" value="${activemq.pool.idleTimeout}" />
<!-- maximumActiveSessionPerConnection : 500 每个连接中使用的最大活动会话数 -->
<!-- idleTimeout : 30 * 1000 单位毫秒 -->
<!-- blockIfSessionPoolIsFull : true -->
<!-- blockIfSessionPoolIsFullTimeout : -1L -->
<!-- expiryTimeout : 0L -->
<!-- createConnectionOnStartup : true -->
<!-- useAnonymousProducers : true -->
<!-- reconnectOnException : true -->
<!-- maxConnections : 默认1 -->
<!-- timeBetweenExpirationCheckMillis : -1 -->
</bean>
<!-- 线程池配置 -->
<bean id="queueMessagee x e cutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaske x e cutor">
<property name="corePoolSize" value="${activemq.listener.pool.corePoolSize}" />
<property name="maxPoolSize" value="${activemq.listener.pool.maxPoolSize}" />
<property name="daemon" value="${activemq.listener.pool.daemon}" />
<property name="keepAliveSeconds" value="${activemq.listener.pool.keepAliveSeconds}" />
</bean>
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="pooledConnectionFactory" />
<!-- deliveryMode : PERSISTENT 默认保存消息 -->
<!-- messageIdEnabled : true 默认有消息id -->
<!-- messageTimestampEnabled : true 默认有消息发送时间 -->
<!-- pubSubNoLocal : false,默认点对点(Queues) -->
<!-- receiveTimeout : 0 阻塞接收不超时,接收消息时的超时时间,单位毫秒 -->
<!-- deliveryDelay : 0 -->
<!-- explicitQosEnabled : false -->
<!-- priority : 4 -->
<!-- timeToLive : 0 -->
<!-- pubSubDomain : false -->
<!-- defaultDestination : 默认目标,默认null -->
<!-- messageConverter : 消息转换器,默认SimpleMessageConverter -->
<!-- sessionTransacted : 事务控制,默认false -->
</bean>
<!-- 定义Queue监听器 -->
<!-- 由于jms:listener-container不支持propertyPlaceholder替换,因此这些参数值写在spring-mq.xml文件中,参考值:mq.properties文件中 -->
<jms:listener-container task-e x e cutor="queueMessagee x e cutor" receive-timeout="60000"
destination-type="queue" container-type="default" connection-factory="pooledConnectionFactory"
acknowledge="auto" concurrency="2-10" >
<jms:listener destination="QUEUE.EMAIL" ref="mailMessageListener" />
<jms:listener destination="QUEUE.SMS" ref="smsMessageListener" />
</jms:listener-container>
<bean id="smsMessageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<!-- 默认调用方法handleMessage -->
<property name="delegate">
<bean class="com.domain.framework.message.sms.listener.SMSMessageListener" />
</property>
<property name="defaultListenerMethod" value="receiveMessage"/>
</bean>
<bean id="mailMessageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<!-- 默认调用方法handleMessage -->
<property name="delegate">
<bean class="com.domain.framework.message.mail.listener.EmailMessageListener" />
</property>
<property name="defaultListenerMethod" value="receiveMessage"/>
</bean>
</beans>
配置说明
- trustAllPackages
- 等于false时,在做object序列化时会有Class Not Found Exception:This class is not trusted to be serialized as ObjectMessage payload异常抛出,是因为activemq服务器默认是不接受object序列化对象,需要配置白名单(接受的object对象class全名)
- 等于true时关闭验证
- 传输对象安全说明: http://activemq.apache.org/objectmessage.htm
- useAsyncSend
- 开启异步消息发送,主要是一个性能上的提升从而提升消息吞吐量,但是不能拿到消息发送后的回执消息,消息不会丢失
- 异步发送的说明:http://activemq.apache.org/async-sends.html
executor corePoolSize
- 该值的配置需要结合listener的个数和concurrency的数量去灵活配置
案例分析
<bean id="queueMessageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="2" /> <property name="maxPoolSize" value="10" /> <property name="daemon" value="true" /> <property name="keepAliveSeconds" value="120" /> </bean> <jms:listener-container task-executor="queueMessageExecutor" receive-timeout="60000" destination-type="queue" container-type="default" connection-factory="pooledConnectionFactory" acknowledge="auto" concurrency="2-10" > <jms:listener destination="QUEUE.EMAIL" ref="mailMessageListener" /> <jms:listener destination="QUEUE.SMS" ref="smsMessageListener" /> </jms:listener-container>
项目中有2个listener并且项目希望启动初始每个listener启动2个consumer最大10个consumer,如果e x e cutor corePoolSize配置为2,那么启动后只会给一个listener分配2个consumer,因为e x e cutor pool的初始配置数量不够,见下图
修改corePoolSize之后
<property name="corePoolSize" value="5" />
- 该值的配置需要结合listener的个数和concurrency的数量去灵活配置
executor daemon
- 是否创建守护线程
- 设置为true时,在应用程序在紧急关闭时,任然会执行没有完成的runtime线程
jms:listener-container
- 由于不支持propertyPlaceholder替换,因此这些参数值写在spring-mq.xml文件中,参考值:mq.properties文件中
- destination-type 目标类型(QUEUE, TOPIC, DURABLETOPIC)
- acknowledge 消息确认方式(auto、client、dups-ok、transacted)
- concurrency listener consumer个数
message-converter
- 消息转换器,我们这里不配置特殊的转换器,使用Spring提供的org.springframework.jms.support.converter.SimpleMessageConverter.SimpleMessageConverter()简单转换器,支持对象(String、byte[]、Map、Serializable)
- 结合org.springframework.jms.listener.adapter.MessageListenerAdapter做接受消息自动转换对象
- 结合org.springframework.jms.core.JmsTemplate使用convertAndSend系列方法对象转换并发送,实现发送消息自动转换。
- 我们为什么不使用json做消息转换,因为json转换在反序列话时需要明确序列化Class类型,丢失了消息转换器的通用性。
Listener
- 支持实现JMS接口的类javax.jms.MessageListener,它是一个来自JMS规范的标准化接口,但是你要处理线程。。
- 支持Spring SessionAwareMessageListener,这是一个Spring特定的接口,提供对JMS会话对象的访问。 这对于请求 - 响应消息传递非常有用。 只需要注意,你必须做自己的异常处理(即,重写handleListenerException方法,这样异常不会丢失)。
- 支持Spring MessageListenerAdapter,这是一个Spring特定接口,允许特定类型的消息处理。 使用此接口可避免代码中任何特定于JMS的依赖关系。
MessageListenerAdapter
- 可以代理任意POJO类,无需实现JMS接口,任意指定回调方法,并且消息转换内置实现,JMS会话默认封装
使用示例:
消息接收
<bean id="mailMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <!-- 默认调用方法handleMessage --> <property name="delegate"> <bean class="com.domain.framework.message.mail.listener.EmailMessageListener" /> </property> <property name="defaultListenerMethod" value="receiveMessage"/> </bean> public class EmailMessageListener { public void receiveMessage(EmailMessageVo message) { ...someing.... } }
消息发送
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="pooledConnectionFactory" /> </bean> @Component("emailService") public class EmailServiceImpl implements IEmailService { @Autowired private JmsTemplate jmsTemplate; @Override public void sendEmailMessage(EmailMessageVo message) throws BizException { if(message != null) { jmsTemplate.convertAndSend(QueueNames.EMAIL, message); } else { logger.warn("sendEmailMessage() param[message] is null ,can't send message!"); } } }
ps.上面的示例主要是org.springframework.jms.core.JmsTemplate与org.springframework.jms.listener.adapter.MessageListenerAdapter和业务的POJO做消费者的一个结合使用示例,无需关注序列化,发送与接受对象直接使用业务POJO
- 可以代理任意POJO类,无需实现JMS接口,任意指定回调方法,并且消息转换内置实现,JMS会话默认封装
使用示例:
消息接收
Q名称的命名规则
- 名称我们采用大写字母,多个单词之间分隔符使用“.”,例如:QUEUE.XXX、TOPIC.XXX
- 根据产品线或项目名称增加namespace,例如:APP1.QUEUE.XXX、APP2.QUEUE.XXX
Active MQ包使用说明
- 不要使用activemq-all这个包,这个包打包了依赖(pool源码,spring源码,log4j源码,jms源码),会跟我们的日志框架产生冲突
- 我们使用activemq-pool、activemq-client、activemq-broker、spring-jms去替换上面的activemq-all包
Spring+Activemq使用配置非常灵活,我们不拘泥于一种形式,如果有更好的经验尽管提出来我们共同努力和进步。