ysw's blog

    Flume Testing

    24 Jan 2019 |

    포스팅

    Apache Flume

    Implementation Component of Flume Component




    데이터 소스는 사용자 정의가 가능하기 때문에 Flume은 네트워크 트래픽 데이터, 소셜 미디어 생성 데이터, 전자 메일 메시지 및 거의 모든 데이터 소스를 포함하여 대량의 이벤트 데이터를 전송하는 데 사용할 수 있습니다.

    Flume은 인공수로, 용수로 등의 사전적의미를 가진다. Apache Flume은 로그(통나무)의 이중적인 의미에서 착안한 멋진 비유입니다. 통나무를 운반하는 수로의 이미지로 여러 서버로부터 로그를 수집하고 모으는 스트리밍 로그 수집기 기술을 설명하는, 딱 맞는 이름입니다. Apache Flume과 벌목장 플룸을 다음과 같이 표현으로 설명할 수 있습니다.
    Apache Flume 벌목장 Flume
    Apache Flume이란 전통적인 벌목장의 플룸이란
    여러 서버에 여러 벌목장에
    설치되는 소프트웨어로 설치되는 통나무 운반용 수로(플룸)로
    수집한 로그를 벌목한 통나무(로그)를
    원격지의 Data Lake로 산 아래 강으로
    전송하는 비동기 채널입니다. 운반하는 수로입니다.

    벌목장의 플룸의 이미지로 부터 우리는 다음과 같은 Apache Flume의 기술적인 특징을 유추할 수 있습니다.
    1. 분산환경에서 로그를 모으는 소프트웨어다. (벌목장과 강이 멀리 떨어진..)
    2. 여러곳에 위치하는 로그를 한 곳으로 모을 수 있다.
    3. 로그를 배치로 한꺼번에 보내는 것이 아니라 스트리밍하게 지속적으로 보낸다.
    4. 비동기 방식으로 처리한다.
    5. 로그를 수집하는 역할과 로그를 전송하는 역할은 개별적으로 움직인다. (Source와 Sink는 개별적인 Thread임)

    여러 서비스 제공 서버에 산재해 있는 로그들을 하나의 로그 수집서버로 모으는 역할을 수행해야하는 수집기로서 어울리는 이름이다. 전형적인 Converging Flow(??)의 구조로 구성되는 Flume은 스트림 지향의 데이터 플로우를 기반으로 하며 지정된 모든 서버로부터 로그를 수집한 후 하둡 HDFS와 같은 중앙 저장소에 적재하여 분석하는 시스템을 구축해야할 때 적합하다. Flume은 아래 4가지 사항에 대한 핵심사항을 만족시키도록 설계되었으며 이를 바탕으로 최신 아파치 오픈소스 버전을 제공하고 있다.

    시스템 신뢰성$(Reliability)$ - 장애가 발생시 로그의 유실없이 전송할 수 있는 기능
    시스템 확장성$(Scalability)$ - Agent의 추가 및 제거가 용이, Scale-up/Scale-out 방식의 확장을 모두 지원
    관리 용이성$(Manageability)$ - 간결한 구조로 관리가 용이
    기능 확장성$(Extensibility)$ - 새로운 기능을 쉽게 추가할 수 있음

    클라우데라에서 개발하던 0.x 버전을 Flume OG 라고 지칭하며 아파치 오픈소스로 이관된 이후의 1.X 버전을 Flume NG라고 부른다. Flume OG에서 Agent, Collector, Master로 구분되어지던 아키텍쳐 구조가 Flume NG에서 하나의 Agent로 통합되어 단순해졌으며 이전 버전보다 기능은 줄어들었지만 단순한 구조로 인해 확장성과 자유도가 높아서 훨씬 유연하게 업무에 적용 가능해졌다. Google 검색을 통해 Flume 관련 정보를 검색하면 아직까지도 Flume OG에 대한 내용이 훨씬 많이 조회되지만 프로젝트를 시작할 당시에 비해 지금은 Flume NG 관련 정보가 많이 확인이 된다.



    Flume NG는 하나의 Agent로 구성되는데 Agent는 내부적으로 Source와 Sink 그리고 Channel로 구성된다.

    $1. Source$ - Event를 받아 입력된 정보를 Sink로 전달한다.
    $2. Channel$ - Source와 Sink의 Dependency를 제거하고 장애에 대비하기 위해 중간 채널을 제공하며 Source는 Channel에 Event 정보를 저장하며 Sink는 채널로부터 정보를 전달받아 처리한다.
    $3. Sink$ - 채널로부터 Source가 전달한 Event 정보를 하둡 HDFS에 저장하거나 다음 Tier의 Agent 또는 DB로 전달한다. 지정된 프로토콜의 Type에 따른 처리를 진행한다.

    Flume Converging Flow



    각 서버의 로그를 수집하기 위해 HDFS로 직접 연결시 각 서버마다 연동을 위해 복잡한 코드들과 지속적인 관리가 필요하다. 유지보수 비용 측면이나 확장성 관점에 추천할 수 없는 구조이다.



    로그 수집의 권한을 Flume으로 위임함으로써 각 서버들은 고객들에게 기존보다 빠른 서비스를 제공할 수 있다. 하나의 Flume Agent가 로그를 수집함으로써 Flume Agent의 장애 발생시 로그수집이 중단될 수 있는 문제가 존재한다.



    장애 발생시 최소한의 투자비용으로 가용성을 확보할 수 있다. 장애로 인해 Flume Agent가 중지되더라도 백업 Agent를 통해 계속 로그 수집을 진행할 수 있다. 장애 대응을 위한 Failover나 로그 Event 정보를 분산하기 위한 Load balance 기능등 상황에 맞게 적용하면 된다.


    <대량 로그를 처리를 위한 일반적인 구조>

    100개 이상의 서버로부터 대량의 로그를 수집할 경우 여러 단계의 Tier를 구성하여 로그를 수집한다. 하나의 Flume Agent로 로그가 집중됨으로서 서버 부하 발생 및 러리 지연을 방지하기 위해 Tier별로 구성하여 처리한다. 하지만, 각 시스템 상황에 맞춰 Flume Agent의 Tier를 구성할 필요가 있다.

    Flume 설치

    Flume 설치 파일은 다음 URL에서 다운로드 할 수 있습니다. 2019.01 현재 최신버전은 1.9.0입니다. 설치 파일은 gz파일 형태로 배포됩니다.
    https://flume.apache.org/download.html
    설치는 압축을 푸는 것으로 완료됩니다.


    
    $ wget http://apache.mirror.cdnetworks.com/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
    $ sudo cp apache-flume-1.9.0-bin.tar.gz /usr/local/lib/apache-flume-1.9.0-bin.tar.gz
    $ sudo tar -xvf apache-flume-1.9.0-bin.tar.gz
    $ sudo mv apache-flume-1.9.0-bin/ flume/
    


    Flume 설치 디렉터리 /usr/local/lib/flume로 가정합니다. 이 디렉터리는 아래에서 {FLUME_HOME}으로 참조하겠습니다.
    그전에 자바가 미리 설치되어 있어야합니다. 아래와 같은 셋팅을 해줍니다.


    
    $ sudo add-apt-repository ppa:openjdk-r/ppa
    $ sudo apt update
    $ sudo apt install openjdk-8-jre openjdk-8-jdk
    $ sudo update-alternatives --config java
    $ sudo update-alternatives --config javac
    $ sudo update-alternatives --get-selections | grep openjdk
    
    $ export FLUME_HOME="/usr/local/lib/flume"
    $ export FLUME_CONF_DIR="$FLUME_HOME/conf"
    $ export FLUME_CLASSPATH="$FLUME_CONF_DIR"
    $ export PATH="$FLUME_HOME/bin:$PATH"
    
    $ export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
    




    Flume 기본 설정과 실행 방법

    Flume으로 서버로그를 Hadoop에 수집하는 예제를 소개합니다. Flume은 에이전트 노드와 컬렉트 노드에 설치되었음을 가정합니다.



    Agent Node
    로그가 발생하는 서버
    WAS 로그가 저장되는 디렉터리를 spooling 하여 로그 메모리 채널에 전송
    Avro Sink가 채널의 로그를 Collecting node로 전송
    port: 4545
    Collecting node
    Avro Source : Server A로부터 Avro통신을 통해 로그 수집하여 메모리 채널에 전달
    HDFS Sink : 수집된 이벤트를 HDFS에 저장

    Agent 노드 Flume 설정 및 실행

    로그를 수집하는 에이전트 노드에 다음과 같은 Flume 설정을 추가합니다. 파일명은 flume.conf입니다. 다음 예제는 /data/waslogs에 추가되는 로그 파일을 컬렉트 노드에 전송하는 설정입니다. 전송 포맷은 avro입니다.

    이 설정 파일은 /usr/local/lib/flume/conf에 하는 것으로 가정합니다.



    
    sudo vim AgentFlume.conf
    agentDataSource.sources = logsrc
    agentDataSource.channels = logChannel
    agentDataSource.sinks = avroSink
    
    # Source : Log
    agentDataSource.sources.logsrc.type = spooldir
    agentDataSource.sources.logsrc.channels = logChannel
    agentDataSource.sources.logsrc.spoolDir = /data/waslogs
    
    # Sink : Avro
    agentDataSource.sinks.avroSink.type = avro
    agentDataSource.sinks.avroSink.channel = logChannel
    agentDataSource.sinks.avroSink.hostname = 142.3.3.1
    agentDataSource.sinks.avroSink.port = 4545
    
    # Channel : Memory Channel
    agentDataSource.channels.logChannel.type = memory
    agentDataSource.channels.logChannel.capacity = 100
    ################################################
    


    에이전트 노드의 flume은 다음과 같은 명령으로 실행됩니다.



    
    cd /usr/local/lib/flume
     ./bin/flume-ng agent --conf /usr/local/flume/conf --conf-file ./conf/AgentFlume.conf --name agent01
    




    Collecting 노드 Flume 설정 및 실행

    다음은 컬렉트 노드의 flume 설정입니다. avro 포멧으로 유입되는 로그를 수집항 hadoop에 저장하는 구성입니다. 파일명은 flume.conf입니다. 이 설정 파일은 /usr/local/lib/flume/conf에 하는 것으로 가정합니다.



    
    sudo vim CollectingFlume.conf
    agentDataCollector.sources = targetSource
    agentDataCollector.channels = targetChannel
    agentDataCollector.sinks = targetSink
    
    # Source : Avro
    agentDataCollector.sources.targetSource.type = avro
    agentDataCollector.sources.targetSource.channels = targetChannel
    agentDataCollector.sources.targetSource.bind = 142.3.3.1
    agentDataCollector.sources.targetSource.port = 4545
    
    # Sink : HDFS
    agentDataCollector.sinks.targetSink.type  = hdfs
    agentDataCollector.sinks.targetSink.channel = memoryChannel
    agentDataCollector.sinks.targetSink.hdfs.path = hdfs://142.3.3.5:9000/data/stats/%Y-%m-%d/%H
    agentDataCollector.sinks.targetSink.hdfs.fileType = DataStream
    agentDataCollector.sinks.targetSink.writeFormat = Text
    agentDataCollector.sinks.targetSink.hdfs.filePrefix = access_log
    agentDataCollector.sinks.targetSink.hdfs.fileSuffix = .log
    agentDataCollector.sinks.targetSink.hdfs.threadsPoolSize = 10
    agentDataCollector.sinks.targetSink.hdfs.rollInterval = 30
    agentDataCollector.sinks.targetSink.hdfs.round = false
    local_agent.sinks.localHdfsSink.hdfs.roundValue = 5
    local_agent.sinks.localHdfsSink.hdfs.roundUnit = minute
    
    # Channel : Memory
    agentDataCollector.channels.targetChannel.type = memory
    agentDataCollector.channels.targetChannel.capacity = 100
    ################################################
    


    에이전트 노드의 flume은 다음과 같은 명령으로 실행됩니다.



    
    cd /usr/local/lib/flume
    ./bin/flume-ng agent --conf /usr/local/flume/conf --conf-file ./conf/CollectingFlume.conf --name agent02
    


    Flume의 유연한 구성 그리고 Kafka

    Flume은 다양한 Source와 Sink 타입을 제공합니다. 이런 구현체를 이용하여 다양한 형태의 Data Flow를 디자인할 수 있습니다. 아래의 그림은 일반적인 Flume 데이터 흐름 모델입니다.



    A 모델: Consolidation Model
    	여러 서버로부터 로그를 통합하여 수집하고 저장하는 모델
    	각 서버에 Flume Agent가 설치되어 로그를 통합 Flume에 저장
    	통합 Flume은 지정된 목적지에 저장
    B 모델: HA 모델
    	A 모델에서 통합 Flume 장애의 SPOF(단일 장애 포인트)에 대한 보완
    	고가용성을 위해서 통합 Flume을 이중화
    C 모델: Multi-Target Model
    	A 모델의 목적지를 복수로 지정
    D 모델: Flafka Model
    	Flume이 Kafka의 Producer와 Consumer 역할 수행
    	통합 Flume을 Kafka로 대체하여 고가용성 및 확장성, 유연성을 확보
    	Flume을 이용하여 Kafka 개발 요소 제거
    	
    Flume의 가장 큰 취약점은 데이터의 안정성입니다. Flume은 Channel로 메모리와 파일 그리고 JDBC를 제공합니다. 메모리 타입은 처리 성능은 좋지만, Flume 장애 발생 시 데이터가 유실의 문제가 있습니다. 반면 파일 타입을 사용하면 데이터 안정성은 향상되지만, 성능이 크게 떨어집니다. 그리고 고가용성 모드로 관리하기 어렵다는 것입니다. 이러한 문제는 Flume과 Kafka를 결합함으로써 해결할 수 있습니다. 최근에 로그/이벤트 수집 환경을 구성 시 Flume만으로 구성하는 경우는 거의 없습니다. 아래의 그림와 같이 Flafka Model을 사용합니다.



    Flume의 장점은 다양한 소스와 목적지에 대한 컴포넌트가 이미 구현되어 있다는 것입니다. 일반적으로 Flume 설치 및 설정만으로 작업을 완료할 수 있습니다. (물론 기능 확장 가능합니다.) Flume의 단점은 데이터를 저장하는 부분에서 장애가 발생할 경우, 데이터 유실의 가능성이 있고 확장 구성이 복잡하다는 것입니다. Kafka의 장점은 저장된 데이터를 안전하게 관리할 수 있고, 구성이 간단하고 확장성이 좋다는 것입니다. Kafka의 단점은 데이터 수집기(producer)와 데이터 처리기(Consumer)를 대부분 사용자가 구현해야 한다는 것입니다.

    이 구 컴포넌트를 함께 사용하면 각자의 취약점을 보완하고 강점을 부각할 수 있습니다. Kafka의 확장성과 관리 편의성 그리고 데이터 안정성을 확보하면서, Flume 컴포넌트 구성을 통해서 사용 편의성을 높일 수 있습니다.

    Flume 모니터링

    Flume을 모니터링하는 방법은 기본적으로 3가지가 있습니다.


    	Ganglia
    	JMX
    	JSON Reporting
    	


    Flume은 JSON 리포팅 기능을 제공합니다. flume 실행 시 -Dflume.monitoring.type=http 옵션을 추가하여 웹 기반 모니터링이 가능합니다. 리포팅 기본 포트는 41414이며 변경 가능합니다. http://<모니터링 대상 Flum IP>:41414/metrics 호출하면 아래와 같은 정보가 출력됩니다.


    
    {
       "SINK.avroSink":{
          "BatchCompleteCount":"133",
          "ConnectionFailedCount":"0",
          "EventDrainAttemptCount":"13300",
          "ConnectionCreatedCount":"1",
          "Type":"SINK",
          "BatchEmptyCount":"0",
          "ConnectionClosedCount":"0",
          "EventDrainSuccessCount":"13300",
          "StopTime":"0",
          "StartTime":"1398215901251",
          "BatchUnderflowCount":"0"
       },
       "SOURCE.otvSource":{
          "OpenConnectionCount":"0",
          "Type":"SOURCE",
          "AppendBatchAcceptedCount":"133",
          "AppendBatchReceivedCount":"133",
          "EventAcceptedCount":"13300",
          "AppendReceivedCount":"0",
          "StopTime":"0",
          "StartTime":"1398215901332",
          "EventReceivedCount":"13300",
          "AppendAcceptedCount":"0"
       },
       "CHANNEL.otvChannel":{
          "EventPutSuccessCount":"13300",
          "ChannelFillPercentage":"0.0",
          "Type":"CHANNEL",
          "EventPutAttemptCount":"13300",
          "ChannelSize":"0",
          "StopTime":"0",
          "StartTime":"1398215901247",
          "EventTakeSuccessCount":"13300",
          "ChannelCapacity":"100",
          "EventTakeAttemptCount":"13301"
       }
    }
    


    가져온곳: http://www.nextree.co.kr/p2704/
    http://taewan.kim/post/flume_images/