ElasticSearch 를 좀더 편하게 사용하기 위해서,
Flume, Kafka를 같이 사용하게 됨,
Kafka Queue 등록된 Event(Message)를 Flume을 통해서 ElasticSearch에 저장하는 process
1. Flume 설치
- Site : https://flume.apache.org/index.html
1) Download
# wget http://www.apache.org/dyn/closer.lua/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
2) unpack
# tar -zxvf apache-flume-1.6.0-bin.tar.gz
3) Source, Sink 유형 참조
https://flume.apache.org/FlumeUserGuide.html
4) flume 환경 설정
# cd flume/conf
# cp flume-conf.properties.template flume-conf.properties
# vi flume-conf.properties
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = seqGenSrc agent.channels = memoryChannel agent.sinks = loggerSink # For each one of the sources, the type is defined agent.sources.seqGenSrc.type = seq //source 타입 # The channel can be defined as follows. agent.sources.seqGenSrc.channels = memoryChannel // source를 channel과 연결 # Each sink's type must be defined agent.sinks.loggerSink.type = logger // sink 타입 #Specify the channel the sink should use agent.sinks.loggerSink.channel = memoryChannel // sink를 channel과 연결 # Each channel's type is defined. agent.channels.memoryChannel.type = memory //channel 타입 # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100 // channel의 용량 |
이 파일에서 Source, Channel, Sink를 어떻게 설정하느냐에 따라 여러 Flow를 구성 가능
es 연결을 위해 다음처럼 변경
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' # agent Componet agent.sources = src1 agent.channels = ch1 agent.sinks = sink1 # For each one of the sources, the type is defined #agent.sources.seqGenSrc.type = seq # kafka source config agent.sources.src1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.src1.topic = TestAction agent.sources.src1.groupId = index agent.sources.src1.zookeeperConnect = 127.0.0.1:2181 agent.sources.src1.kafka.consumer.timeout.ms = 100 # The channel can be defined as follows. #agent.sources.seqGenSrc.channels = memoryChannel # Bind the source to the channel agent.sources.src1.channels = ch1 #channel for ES agent.channels.ch1.type = memory agent.channels.ch1.capacity = 10000 agent.channels.ch1.transactionCapacity = 100 # Bind the sink to the channel agent.sinks.sink1.channel = ch1 # Describe the sink agent.sinks.sink1.type = logger |
5) 실행
# ./bin/flume-ng agent --conf-file ./conf/flume-conf.properties --name agent01
※ 실행 script
# vi flume
#! /bin/bash FLUME_HOME=/home/flume/flume-1.6.0 FLUME_LOG_DIR=/home/flume/flume-1.6.0/log FLUME_CONF_DIR=${FLUME_HOME}/$3 FLUME_CONF_FILE=${FLUME_CONF_DIR}/$4 EXEC_PATH=${FLUME_HOME}/bin/flume-ng FLUME_AGENT_NAME=$2 FLUME_PID_FILE=${FLUME_HOME}/flume-ng-${FLUME_AGENT_NAME}.pid FLUME_JMX_PORT=$5 FLUME_MONITORING_PORT=$6 FLUME_CLASSPATH=/home/flume/flume-1.6.0/conf desc="Flume NG agent daemon" start() { if [ ! -e $FLUME_PID_FILE ]; then /bin/bash -c "/bin/bash -c 'echo \$\$ >${FLUME_PID_FILE} && exec ${EXEC_PATH} agent --classpath ${FLUME_CLASSPATH} --conf $FLUME_CONF_DIR --conf-file $FLUME_CONF_FILE --name ${FLUME_AGENT_NAME} -Dflume.monitoring.type=http -Dflume.monitoring.port=${FLUME_MONITORING_PORT} -Duser.timezone=GMT >>${FLUME_LOG_DIR}/${FLUME_AGENT_NAME}.init.log 2>&1' &" touch ${FLUME_LOG_DIR}/${FLUME_AGENT_NAME}.log else echo "Flume agent is running; please, stop it first" exit 0 fi } stop() { if [ ! -e $FLUME_PID_FILE ]; then echo "Flume agent is not running" exit 0 fi FLUME_PID=`cat $FLUME_PID_FILE` if [ -n $FLUME_PID ]; then kill -TERM ${FLUME_PID} &>/dev/null status=0 while [ $status -eq 0 ]; do sleep 1 ps -p $FLUME_PID &> /dev/null status=$? done fi rm -f $FLUME_PID_FILE echo "Stopping $desc (flume-ng-agent): " ps -ef | grep flume | grep -v grep return 0 } case "$1" in start) start ;; stop) stop ;; *) echo $"Usage: $0 {start|stop} {agent name}" exit 1 esac |
# vi start_flume.sh
#!/bin/bash export JAVA_HOME=/usr/java/jdk1.8.0_60 # 실행 파일 설정 설정파일 jmx / monitoring port sh flume start agent conf flume-conf.properties 64321 44545& |