OpenSource

Flume설치

아르비스 2016. 6. 22. 09:57

ElasticSearch 를 좀더 편하게 사용하기 위해서, 


Flume, Kafka를 같이 사용하게 됨,


Kafka Queue 등록된 Event(Message)를 Flume을 통해서 ElasticSearch에 저장하는 process


1. Flume 설치

 - Site : https://flume.apache.org/index.html


 1) Download

   # wget http://www.apache.org/dyn/closer.lua/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz


 2) unpack

   # tar -zxvf apache-flume-1.6.0-bin.tar.gz


 3) Source, Sink 유형 참조

   https://flume.apache.org/FlumeUserGuide.html


 4) flume 환경 설정

   # cd flume/conf

   # cp flume-conf.properties.template flume-conf.properties

   # vi flume-conf.properties

# The configuration file needs to define the sources, 

# the channels and the sinks.

# Sources, channels and sinks are defined per agent, 

# in this case called 'agent'


agent.sources = seqGenSrc

agent.channels = memoryChannel

agent.sinks = loggerSink


# For each one of the sources, the type is defined

agent.sources.seqGenSrc.type = seq //source 타입


# The channel can be defined as follows.

agent.sources.seqGenSrc.channels = memoryChannel  // source를 channel과 연결


# Each sink's type must be defined

agent.sinks.loggerSink.type = logger  // sink 타입


#Specify the channel the sink should use

agent.sinks.loggerSink.channel = memoryChannel  // sink를 channel과 연결


# Each channel's type is defined.

agent.channels.memoryChannel.type = memory //channel 타입


# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 100 // channel의 용량


이 파일에서 Source, Channel, Sink를 어떻게 설정하느냐에 따라 여러 Flow를 구성 가능


es 연결을 위해 다음처럼 변경

# The configuration file needs to define the sources,

# the channels and the sinks.

# Sources, channels and sinks are defined per agent,

# in this case called 'agent'


# agent Componet

agent.sources = src1

agent.channels = ch1

agent.sinks = sink1


# For each one of the sources, the type is defined

#agent.sources.seqGenSrc.type = seq

# kafka source config

agent.sources.src1.type = org.apache.flume.source.kafka.KafkaSource

agent.sources.src1.topic = TestAction

agent.sources.src1.groupId = index

agent.sources.src1.zookeeperConnect = 127.0.0.1:2181

agent.sources.src1.kafka.consumer.timeout.ms = 100


# The channel can be defined as follows.

#agent.sources.seqGenSrc.channels = memoryChannel

# Bind the source to the channel

agent.sources.src1.channels = ch1


#channel for ES

agent.channels.ch1.type = memory

agent.channels.ch1.capacity = 10000

agent.channels.ch1.transactionCapacity = 100


# Bind the sink to the channel

agent.sinks.sink1.channel = ch1


# Describe the sink

agent.sinks.sink1.type = logger



 5) 실행

 # ./bin/flume-ng agent --conf-file ./conf/flume-conf.properties --name agent01



※ 실행 script

# vi flume

#! /bin/bash

FLUME_HOME=/home/flume/flume-1.6.0

FLUME_LOG_DIR=/home/flume/flume-1.6.0/log

FLUME_CONF_DIR=${FLUME_HOME}/$3

FLUME_CONF_FILE=${FLUME_CONF_DIR}/$4


EXEC_PATH=${FLUME_HOME}/bin/flume-ng

FLUME_AGENT_NAME=$2

FLUME_PID_FILE=${FLUME_HOME}/flume-ng-${FLUME_AGENT_NAME}.pid

FLUME_JMX_PORT=$5

FLUME_MONITORING_PORT=$6

FLUME_CLASSPATH=/home/flume/flume-1.6.0/conf


desc="Flume NG agent daemon"


start() {

  if [ ! -e $FLUME_PID_FILE ]; then

    /bin/bash -c "/bin/bash -c 'echo \$\$ >${FLUME_PID_FILE} && exec ${EXEC_PATH} agent --classpath ${FLUME_CLASSPATH} --conf $FLUME_CONF_DIR --conf-file $FLUME_CONF_FILE --name ${FLUME_AGENT_NAME} -Dflume.monitoring.type=http -Dflume.monitoring.port=${FLUME_MONITORING_PORT} -Duser.timezone=GMT >>${FLUME_LOG_DIR}/${FLUME_AGENT_NAME}.init.log 2>&1' &"

    touch ${FLUME_LOG_DIR}/${FLUME_AGENT_NAME}.log

  else

    echo "Flume agent is running; please, stop it first"

    exit 0

  fi

}


stop() {

  if [ ! -e $FLUME_PID_FILE ]; then

    echo "Flume agent is not running"

    exit 0

  fi


  FLUME_PID=`cat $FLUME_PID_FILE`

  if [ -n $FLUME_PID ]; then

    kill -TERM ${FLUME_PID} &>/dev/null

    status=0

    while [ $status -eq 0 ]; do

      sleep 1

      ps -p $FLUME_PID &> /dev/null

      status=$?

    done

  fi

  rm -f $FLUME_PID_FILE


  echo "Stopping $desc (flume-ng-agent): "

  ps -ef | grep flume | grep -v grep

 return 0

}


case "$1" in

  start)

    start

    ;;

  stop)

    stop

    ;;

  *)

    echo $"Usage: $0 {start|stop} {agent name}"

    exit 1

esac

 


# vi start_flume.sh

#!/bin/bash

export JAVA_HOME=/usr/java/jdk1.8.0_60


#    실행 파일         설정    설정파일              jmx    / monitoring  port

sh flume start agent conf flume-conf.properties 64321 44545&