根据我们的使用场景抽取出来了一系列activemq公共配置参数mq.properties
mq.properties activemq.connnect.brokerurl=failover:(tcp://192.168.0.66:61616) activemq.connnect.useAsyncSend=true # object对象接受报名单,true不受限制,false需要设置白名单 activemq.connnect.trustAllPackages=true # 最大连接数 activemq.pool.maxConnections=20 # 空闲失效时间,毫秒 activemq.pool.idleTimeout=60000 # 初始数量 activemq.listener.pool.corePoolSize=5 activemq.listener.pool.maxPoolSize=10 # 启动守护进程 activemq.listener.pool.daemon=true # 单位秒 activemq.listener.pool.keepAliveSeconds=120 # 由于jms:listener-container不支持propertyPlaceholder替换,因此这些参数值写在spring-mq.xml文件中,参考值 # # 接收消息时的超时时间,单位毫秒 activemq.consumer.receiveTimeout=60000 # 监听目标类型 activemq.listener.destinationtype=queue # 监听确认消息方式 activemq.listener.acknowledge=auto # 监听数量 activemq.listener.concurrency=2-10
spring-mq.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- 配置activeMQ连接 tcp://192.168.0.66:61616 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activemq.connnect.brokerurl}" /> <!-- useAsyncSend 异步发送 --> <property name="useAsyncSend" value="${activemq.connnect.useAsyncSend}"></property> <!-- 关闭对象传输有白名单限制 --> <property name="trustAllPackages" value="${activemq.connnect.trustAllPackages}"></property> </bean> <!-- 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化 这样可以大大减少我们的资源消耗, --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="${activemq.pool.maxConnections}" /> <property name="idleTimeout" value="${activemq.pool.idleTimeout}" /> <!-- maximumActiveSessionPerConnection : 500 每个连接中使用的最大活动会话数 --> <!-- idleTimeout : 30 * 1000 单位毫秒 --> <!-- blockIfSessionPoolIsFull : true --> <!-- blockIfSessionPoolIsFullTimeout : -1L --> <!-- expiryTimeout : 0L --> <!-- createConnectionOnStartup : true --> <!-- useAnonymousProducers : true --> <!-- reconnectOnException : true --> <!-- maxConnections : 默认1 --> <!-- timeBetweenExpirationCheckMillis : -1 --> </bean> <!-- 线程池配置 --> <bean id="queueMessagee x e cutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaske x e cutor"> <property name="corePoolSize" value="${activemq.listener.pool.corePoolSize}" /> <property name="maxPoolSize" value="${activemq.listener.pool.maxPoolSize}" /> <property name="daemon" value="${activemq.listener.pool.daemon}" /> <property name="keepAliveSeconds" value="${activemq.listener.pool.keepAliveSeconds}" /> </bean> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="pooledConnectionFactory" /> <!-- deliveryMode : PERSISTENT 默认保存消息 --> <!-- messageIdEnabled : true 默认有消息id --> <!-- messageTimestampEnabled : true 默认有消息发送时间 --> <!-- pubSubNoLocal : false,默认点对点(Queues) --> <!-- receiveTimeout : 0 阻塞接收不超时,接收消息时的超时时间,单位毫秒 --> <!-- deliveryDelay : 0 --> <!-- explicitQosEnabled : false --> <!-- priority : 4 --> <!-- timeToLive : 0 --> <!-- pubSubDomain : false --> <!-- defaultDestination : 默认目标,默认null --> <!-- messageConverter : 消息转换器,默认SimpleMessageConverter --> <!-- sessionTransacted : 事务控制,默认false --> </bean> <!-- 定义Queue监听器 --> <!-- 由于jms:listener-container不支持propertyPlaceholder替换,因此这些参数值写在spring-mq.xml文件中,参考值:mq.properties文件中 --> <jms:listener-container task-e x e cutor="queueMessagee x e cutor" receive-timeout="60000" destination-type="queue" container-type="default" connection-factory="pooledConnectionFactory" acknowledge="auto" concurrency="2-10" > <jms:listener destination="QUEUE.EMAIL" ref="mailMessageListener" /> <jms:listener destination="QUEUE.SMS" ref="smsMessageListener" /> </jms:listener-container> <bean id="smsMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <!-- 默认调用方法handleMessage --> <property name="delegate"> <bean class="com.domain.framework.message.sms.listener.SMSMessageListener" /> </property> <property name="defaultListenerMethod" value="receiveMessage"/> </bean> <bean id="mailMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <!-- 默认调用方法handleMessage --> <property name="delegate"> <bean class="com.domain.framework.message.mail.listener.EmailMessageListener" /> </property> <property name="defaultListenerMethod" value="receiveMessage"/> </bean> </beans>
配置说明
trustAllPackages
等于false时,在做object序列化时会有Class Not Found Exception:This class is not trusted to be serialized as ObjectMessage payload异常抛出,是因为activemq服务器默认是不接受object序列化对象,需要配置白名单(接受的object对象class全名)
等于true时关闭验证
传输对象安全说明: http://activemq.apache.org/objectmessage.htm
useAsyncSend
开启异步消息发送,主要是一个性能上的提升从而提升消息吞吐量,但是不能拿到消息发送后的回执消息,消息不会丢失
异步发送的说明:http://activemq.apache.org/async-sends.html
executor corePoolSize
该值的配置需要结合listener的个数和concurrency的数量去灵活配置
案例分析
<bean id="queueMessageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="2" /> <property name="maxPoolSize" value="10" /> <property name="daemon" value="true" /> <property name="keepAliveSeconds" value="120" /> </bean> <jms:listener-container task-executor="queueMessageExecutor" receive-timeout="60000" destination-type="queue" container-type="default" connection-factory="pooledConnectionFactory" acknowledge="auto" concurrency="2-10" > <jms:listener destination="QUEUE.EMAIL" ref="mailMessageListener" /> <jms:listener destination="QUEUE.SMS" ref="smsMessageListener" /> </jms:listener-container>
项目中有2个listener并且项目希望启动初始每个listener启动2个consumer最大10个consumer,如果e x e cutor corePoolSize配置为2,那么启动后只会给一个listener分配2个consumer,因为e x e cutor pool的初始配置数量不够,见下图
修改corePoolSize之后
<property name="corePoolSize" value="5" />
executor daemon
是否创建守护线程
设置为true时,在应用程序在紧急关闭时,任然会执行没有完成的runtime线程
jms:listener-container
由于不支持propertyPlaceholder替换,因此这些参数值写在spring-mq.xml文件中,参考值:mq.properties文件中
destination-type 目标类型(QUEUE, TOPIC, DURABLETOPIC)
acknowledge 消息确认方式(auto、client、dups-ok、transacted)
concurrency listener consumer个数
message-converter
消息转换器,我们这里不配置特殊的转换器,使用Spring提供的org.springframework.jms.support.converter.SimpleMessageConverter.SimpleMessageConverter()简单转换器,支持对象(String、byte[]、Map、Serializable)
结合org.springframework.jms.listener.adapter.MessageListenerAdapter做接受消息自动转换对象
结合org.springframework.jms.core.JmsTemplate使用convertAndSend系列方法对象转换并发送,实现发送消息自动转换。
我们为什么不使用json做消息转换,因为json转换在反序列话时需要明确序列化Class类型,丢失了消息转换器的通用性。
Listener
支持实现JMS接口的类javax.jms.MessageListener,它是一个来自JMS规范的标准化接口,但是你要处理线程。。
支持Spring SessionAwareMessageListener,这是一个Spring特定的接口,提供对JMS会话对象的访问。 这对于请求 - 响应消息传递非常有用。 只需要注意,你必须做自己的异常处理(即,重写handleListenerException方法,这样异常不会丢失)。
支持Spring MessageListenerAdapter,这是一个Spring特定接口,允许特定类型的消息处理。 使用此接口可避免代码中任何特定于JMS的依赖关系。
MessageListenerAdapter
可以代理任意POJO类,无需实现JMS接口,任意指定回调方法,并且消息转换内置实现,JMS会话默认封装 使用示例: 消息接收
<bean id="mailMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <!-- 默认调用方法handleMessage --> <property name="delegate"> <bean class="com.domain.framework.message.mail.listener.EmailMessageListener" /> </property> <property name="defaultListenerMethod" value="receiveMessage"/> </bean> public class EmailMessageListener { public void receiveMessage(EmailMessageVo message) { ...someing.... } }
消息发送
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="pooledConnectionFactory" /> </bean> @Component("emailService") public class EmailServiceImpl implements IEmailService { @Autowired private JmsTemplate jmsTemplate; @Override public void sendEmailMessage(EmailMessageVo message) throws BizException { if(message != null) { jmsTemplate.convertAndSend(QueueNames.EMAIL, message); } else { logger.warn("sendEmailMessage() param[message] is null ,can't send message!"); } } }
ps.上面的示例主要是org.springframework.jms.core.JmsTemplate与org.springframework.jms.listener.adapter.MessageListenerAdapter和业务的POJO做消费者的一个结合使用示例,无需关注序列化,发送与接受对象直接使用业务POJO
Q名称的命名规则
名称我们采用大写字母,多个单词之间分隔符使用“.”,例如:QUEUE.XXX、TOPIC.XXX
根据产品线或项目名称增加namespace,例如:APP1.QUEUE.XXX、APP2.QUEUE.XXX
Active MQ包使用说明
不要使用activemq-all这个包,这个包打包了依赖(pool源码,spring源码,log4j源码,jms源码),会跟我们的日志框架产生冲突
我们使用activemq-pool、activemq-client、activemq-broker、spring-jms去替换上面的activemq-all包
Spring+Activemq使用配置非常灵活,我们不拘泥于一种形式,如果有更好的经验尽管提出来我们共同努力和进步。