基于spring amqp rabbitmq fanout配置如下:
发布端
<rabbit:connection-factory id="rabbitConnectionFactory" username="guest" password="guest" host="localhost" port="5672"/>
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" exchange="fanout-mq-exchange" message-converter="jsonMessageConverter"/> <rabbit:admin id="admin" connection-factory="rabbitConnectionFactory"/> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <rabbit:queue id="fanout_queue" name="fanout_queue" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="fanout_queue1" name="fanout_queue1" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="fanout_queue2" name="fanout_queue2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:fanout-exchange id="fanout-mq-exchange" name="fanout-mq-exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="fanout_queue"/> <rabbit:binding queue="fanout_queue1"/> <rabbit:binding queue="fanout_queue2"/> </rabbit:bindings> </rabbit:fanout-exchange>/**
* @Title: MQProducerImpl.java * @Package com.cyl.rabbitmq * @Description: TODO(用一句话描述该文件做什么) * @author zjhua@hundsun.com * @date 2016年4月25日 下午1:12:46 * @version V1.0 */ package com.cyl.rabbitmq;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;/**
* @author zjhua * */@Servicepublic class MQProducerImpl implements MQProducer {@Autowired
private AmqpTemplate amqpTemplate;/* (non-Javadoc)
* @see com.cyl.rabbitmq.MQProducer#sendDataToQueue(java.lang.String, java.lang.Object) */ @Override public void sendDataToQueue(String queueKey, Object object) { try { amqpTemplate.convertAndSend("fanout-mq-exchange",null,object); } catch (Exception e) { e.printStackTrace(); } }}消费端
<rabbit:connection-factory id="rabbitConnectionFactory" username="guest" password="guest" host="localhost" port="5672"/>
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" exchange="fanout-mq-exchange" message-converter="jsonMessageConverter"/> <rabbit:admin id="admin" connection-factory="rabbitConnectionFactory"/> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- <rabbit:queue id="fanout_queue" name="fanout_queue" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="fanout_queue1" name="fanout_queue1" durable="true" auto-delete="false" exclusive="false" /> --><rabbit:queue id="fanout_queue2" name="fanout_queue2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:fanout-exchange id="fanout-mq-exchange" name="fanout-mq-exchange" durable="true" auto-delete="false"> <rabbit:bindings> <!-- <rabbit:binding queue="fanout_queue"/> <rabbit:binding queue="fanout_queue1"/> --> <rabbit:binding queue="fanout_queue2"/> </rabbit:bindings> </rabbit:fanout-exchange> <!-- <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="foo.*" /> </rabbit:bindings> </rabbit:topic-exchange> <rabbit:listener-container id="fanout1" connection-factory="rabbitConnectionFactory" acknowledge="auto"> <rabbit:listener queues="fanout_queue1" ref="queueListener1"/> </rabbit:listener-container> --> <rabbit:listener-container id="fanout2" connection-factory="rabbitConnectionFactory" acknowledge="auto"> <rabbit:listener queues="fanout_queue2" ref="queueListener2"/> </rabbit:listener-container> <!-- <bean id="mqProducer" class="com.cyl.rabbitmq.MQProducerImpl"></bean> <bean id="queueListener1" class="com.cyl.rabbitmq.QueueListenter"></bean> --> <bean id="queueListener2" class="com.cyl.rabbitmq.QueueListenter"></bean>/**
* @Title: QueueListenter.java * @Package com.cyl.rabbitmq * @Description: TODO(用一句话描述该文件做什么) * @author zjhua@hundsun.com * @date 2016年4月25日 下午1:15:31 * @version V1.0 */ package com.cyl.rabbitmq;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;/**
* @author zjhua * */public class QueueListenter implements MessageListener {/* (non-Javadoc)
* @see org.springframework.amqp.core.MessageListener#onMessage(org.springframework.amqp.core.Message) */ @Override public void onMessage(Message msg) { System.out.println(new String(msg.getBody())); }}事实上这种配置仅适用于中小型规模、服务器数量能够提前预估的环境,对于服务器数量不可提前确定或者经常可能发生变化的环境,并不适合采用配置式,而是在容器启动时进行动态注册。并且queue的属性应该为exclusive+auto-delete。