SpringFramework를 이용하여 RabbitMQ의 Producer와 Consumer를 구현하는 sample Code
POM에 amqp library 추가 하기
<!-- Spring-amqp --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency> <!-- mysql Connector --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.24</version> </dependency> |
Spring context configuration
<rabbit:connection-factory id="connectionFactory" addresses="#{resources['resource.rabbitmq.addressses']}" username="#{resources['resource.rabbitmq.username']}" password="#{resources['resource.rabbitmq.password']}" channel-cache-size="#{resources['resource.rabbitmq.channel']}" />
<!-- MQ PRODUCER --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> <!-- MQ Listener --> <rabbit:queue id="ProcQueue" name="#{resources['resource.rabbitmq.listener.queue.name']}" />
<rabbit:listener-container connection-factory="connectionFactory" concurrency="#{resources['resource.rabbitmq.listener.concurrency']}" channel-transacted="true" transaction-manager="transactionManager"> <rabbit:listener queues="ProcQueue" ref="#{resources['resource.rabbitmq.listener.queue.classname']}" /> </rabbit:listener-container>
<!-- sample consumer class --> <bean id="sampleConsumer" class="com.sds.ioffice.wsp.cmm.service.RabbitMqConsumerServiceImpl" />
|
<properties>
#**** RABBIT MQ SETTING VALUE ******** resource.rabbitmq.addressses=**.**.**.***:5672 resource.rabbitmq.username=efss resource.rabbitmq.password=efss00 resource.rabbitmq.channel=25 #**** RABBIT MQ SETTING VALUE CONSUMER ******** resource.rabbitmq.listener.queue.name=Sample.Queue resource.rabbitmq.listener.concurrency=1 resource.rabbitmq.listener.queue.classname=sampleConsumer |
Producer
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; public class DemoRabbitProducer { @Autowired RabbitTemplate rabbitTemplate; public void sendMessage(String msg) throws Exception { MessageProperties props = new MessageProperties(); Message message = new Message(msg, props); rabbitTemplate.convertAndSend(message); } public void execute(String exchangeName, Map msg) {
amqpTemplate.convertAndSend(exchangeName, "", msg);
}
public void execute(String exchangeName, String json) {
amqpTemplate.convertAndSend(exchangeName, "", json);
} } |
Consumer (Listener)
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.stereotype.Service; import com.sds.ioffice.runtime.utils.JsonUtil; public class RabbitMqConsumerServiceImpl implements MessageListener { @Override public void onMessage(Message message) { logger.info("Consumer Message : {}" , message.toString()); SimpleMessageConverter messageConverter = new SimpleMessageConverter(); String messageData = (String) messageConverter.fromMessage(message); Map<String, Object> map = JsonUtil.convertJsonToMap(messageData);
logger.debug("message : {}",map);
/*for (Entry<String, Object> item : map.entrySet()) { logger.debug("Key : {} , Value : {}", item.getKey(), item.getValue()); }*/ } } |
Consumer는 내부에 Thread로 실행되므로, 생성 후 process가 종료되지 않으면, 계속 pork 해줌.
channel-transacted="false" 로 설정할 경우, transactionManager는 불필요함