Language/Java

RabbitMQ Producer & Consumer

아르비스 2014. 11. 11. 13:26

SpringFramework를 이용하여 RabbitMQ의 Producer와 Consumer를 구현하는 sample Code


POM에 amqp library 추가 하기


           <!-- Spring-amqp -->

   <dependency>

       <groupId>org.springframework.amqp</groupId>

       <artifactId>spring-rabbit</artifactId>

       <version>1.3.5.RELEASE</version>

   </dependency> 

            <!-- mysql Connector -->

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>5.1.24</version>

</dependency>


Spring context configuration


        <rabbit:connection-factory 

    id="connectionFactory" 

    addresses="#{resources['resource.rabbitmq.addressses']}" 

    username="#{resources['resource.rabbitmq.username']}" 

    password="#{resources['resource.rabbitmq.password']}" 

    channel-cache-size="#{resources['resource.rabbitmq.channel']}" />

 

 

  <!-- MQ PRODUCER -->

  <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>



<!-- MQ Listener -->

  <rabbit:queue id="ProcQueue" 

name="#{resources['resource.rabbitmq.listener.queue.name']}" />

<rabbit:listener-container

connection-factory="connectionFactory" 

concurrency="#{resources['resource.rabbitmq.listener.concurrency']}"

channel-transacted="true" 

transaction-manager="transactionManager">

<rabbit:listener queues="ProcQueue" 

ref="#{resources['resource.rabbitmq.listener.queue.classname']}" />

</rabbit:listener-container>

<!-- sample consumer class -->

<bean id="sampleConsumer" class="com.sds.ioffice.wsp.cmm.service.RabbitMqConsumerServiceImpl" />


<properties>

#**** RABBIT MQ SETTING VALUE ********

resource.rabbitmq.addressses=**.**.**.***:5672

resource.rabbitmq.username=efss

resource.rabbitmq.password=efss00

resource.rabbitmq.channel=25


#**** RABBIT MQ SETTING VALUE CONSUMER ********

resource.rabbitmq.listener.queue.name=Sample.Queue

resource.rabbitmq.listener.concurrency=1

resource.rabbitmq.listener.queue.classname=sampleConsumer


Producer


import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
 
public class DemoRabbitProducer {
 
   @Autowired
   RabbitTemplate rabbitTemplate;
 
   public void sendMessage(String msg)
         throws Exception {
 
      MessageProperties props = new MessageProperties();
 
      Message message = new Message(msg, props);
 
      rabbitTemplate.convertAndSend(message);
   }
public void execute(String exchangeName, Map msg) { amqpTemplate.convertAndSend(exchangeName, "", msg); } public void execute(String exchangeName, String json) { amqpTemplate.convertAndSend(exchangeName, "", json); }


 
}


Consumer (Listener)


import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessageListener;

import org.springframework.amqp.support.converter.SimpleMessageConverter;

import org.springframework.stereotype.Service;


import com.sds.ioffice.runtime.utils.JsonUtil;


public class RabbitMqConsumerServiceImpl implements MessageListener {

@Override

public void onMessage(Message message) {

logger.info("Consumer Message : {}" , message.toString());

SimpleMessageConverter messageConverter = new SimpleMessageConverter();

String messageData = (String) messageConverter.fromMessage(message);

Map<String, Object> map = JsonUtil.convertJsonToMap(messageData);

logger.debug("message : {}",map);

/*for (Entry<String, Object> item : map.entrySet()) {

logger.debug("Key : {} , Value : {}", item.getKey(), item.getValue());

}*/

}

}



Consumer는 내부에 Thread로 실행되므로, 생성 후 process가 종료되지 않으면, 계속 pork 해줌.



channel-transacted="false"  로 설정할 경우, transactionManager는 불필요함