
Data Collection Service - Flume
24 Jan 2019 | IoT포스팅
1.2 Flume
Docker Image 생성
Flume Docker Container를 생성하기 위해서는 Docker image가 필요하다. Docker image는 이미 만들어진 image를 다운 받거나 Dockerfile을 이용하여 image를 빌드하고 생성할 수 있다. Flume Docker image를 빌드하기 위한 Dockerfile의 내용은 아래와 같다.
FROM resin/rpi-raspbian
#Update & Install wget, vim
RUN apt-get update
RUN apt-get -y install wget
RUN apt-get -y install vim
RUN apt-get -y install python \
python-dev \
python-pip \
python-virtualenv \
--no-install-recommends
RUN pip install pyserial
RUN mkdir -p /var/log/apache/flumeSpool
RUN touch /var/log/apache/flumeSpool/sensor.txt
#Timezone
RUN cp /usr/share/zoneinfo/Asia/Seoul /etc/localtime
#Install Oracle JAVA
RUN mkdir -p /opt
RUN curl -O -v -j -k -L -H "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-arm32-vfp-hflt.tar.gz
RUN tar -xvzf jdk-8u131-linux-arm32-vfp-hflt.tar.gz -C /opt
#Configurate environmental variables
ENV JAVA_HOME /opt/jdk1.8.0_131
ENV PATH $PATH:/opt/jdk1.8.0_131/bin
RUN ln -s /opt/jdk1.8.0_131/bin/java /usr/bin/java
#Install Flume
RUN sudo wget --no-check-certificate http://www.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz -O - | tar -zxv
RUN sudo mv apache-flume-1.9.0-bin /flume
ADD plugins.d /flume/plugins.d
ADD flume-conf.properties /flume/conf/
#Csharp Programming
COPY PySerial.py /flume/
COPY autosensor.sh /flume/
#Working directory
WORKDIR /flume
이미지 빌드
컨테이너 생성후 Flume Configuration 파일, Python 파일, 센서수집 파일이 필요하기 때문에 Dockerfile, flume-conf.properties, plugins.d Pyserial.py, autosensor.sh를 가진 폴더에서 Dockerfile을 빌드한다.컨테이너 생성
컨테이너에서 시리얼통신 사용시 --privileged 옵션을 줘야지만 컨테이너에서 정상적인 Serial 통신이 가능하다.Configuration Setting
flume에서 서버에 있는 kafka로 값을 보내기 위해서는 flume설정을 알맞게 바꿔줘야 한다. 이러한 flume의 설정파일에 대한 내용은 다음과 같다. sudo vim /flume/conf/flume-conf.properties
# Name the components on this agent
agent.sources = source1
agent.sinks = sink1
agent.channels = channel1
# The source1
agent.sources.source1.type = exec
agent.sources.source1.channels = channel1
agent.sources.source1.command = sh /flume/autosensor.sh
agent.sources.source1.batchSize = 1
agent.sources.source1.batchTimeout = 1000
# The channel
agent.channels.channel1.type = memory
# The sink1
agent.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.sink1.topic = JNU01_rasp
agent.sinks.sink1.brokerList = gist:9092
agent.sinks.sink1.requiredAcks = 0
agent.sinks.sink1.batchSize = 1
# Bind the source and sink to the channel
agent.sources.source1.channels = channel1
agent.sinks.sink1.channel = channel1
Flume의 Source Type를 exec으로 설정하여 쉘스크립트를 실행시켜서 읽어들여 콘솔로 출력한 센서값들이 Flume의 Source가 되도록 설정하엿다.
이후 sink.topic에 사용하고자하는
#!/bin/sh
sensor="/var/log/apache/flumeSpool/sensor.txt"
temp="/var/log/apache/flumeSpool/sensor_Flume.txt"
while [ 1 ]
do
if [ -e $sensor ]
then
mv /var/log/apache/flumeSpool/sensor.txt $temp
tail -1 $temp
rm $temp
sleep 1
fi
done
브로커에 대한 주소 및 포트 설정시 편의를 위해, /etc/hosts에 추가하고자 하는 host를 Container 실행
Docker container가 실행중일 때, attach 명령어를 통하여 Container에 접속 가능하다. Serial 통신포트를 설정한 후 Flume Agent를 실행한다. Pyserial.py 파일을 이용하여 아두이노에서 보내는 센서값 들을 라즈베리파이의 지정된 폴더안에 저장하고, Agent를 실행시켜 센서값들을 라즈베리파이의 지정된 폴더안에 저장하고, Agent를 실행시켜 센서값들을 Kafka broker에게 전송한다. 이 때 Kafka Zookeeper와 broker는 실행 중이어야 한다. sudo python Pyserial.py 위의 명령어를 실행시킨 이후 다른 터미널을 켜서 아래의 명령어를 실행한다. sudo bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name가져온곳: URL