ElasticSearch Ingest Node

** 이 포스트는 ELK(+beats)를 이용한 apache log visualization 의 후속 포스트로써 logstash 대신 elasticsearch 의 ingest node 를 사용하는 방법을 소개 합니다. **

filebeat-logstash-elasticsearch stack을 이용하여 kibana에서 대시보드를 구성하는 방법을 알아봤었습니다.
5.X 버전에서 소개된 ** ingest node ** 를 이용하면 filebeat 에서 elasticsearch 로만 데이터를 보낼때는 grok

등의 전처리 작업이 들어가더라도 logstash 없이 elasticsearch로 바로 데이터를 보낼 수 있습니다.

# ingest node

ingest node를 이용하면 filebeat의 모듈이나 logstash를 이용하지 않고도 indexing 되기 전에 전처리 과정을 넣을 수 있습니다.
ingest node는 default로 모든 node는 별도의 설정을 하지 않는 이상 ingest node로 사용 될 수 있으며 전처리의 각 단계라고 볼 수 있는 processor들을 나열한 pipeline을 정의 함으로써 logstash에서 해주던 것과 비슷하게 전처리를 할 수 있습니다.

# pipeline 정의

기존에 logstash의 filter 부분에 넣었던 grok을 processor로 정의하여 아래와 같이 pipeline을 정의 할 수 있습니다. (Dev Tools의 Console에서 작업하는 기준 입니다.)

1
2
3
4
5
PUT _ingest/pipeline/${파이프라인이름}
{
"description": "",
"processors"[]
}

아래는 실제 제가 적용한 pipeline의 예 입니다. grokremove의 두가지 processor를 등록 했고 grok의 patterns는 커스텀으로 작성 했습니다.
로그 포멧을 custom으로 찍고 있는 경우 (이 포스트에서 다루는 경우와 같은)일일히 체크하면서 pattern을 만들어야 하지만 기본적으로 setting 되어 있는 default 로그 형식을 사용하는 경우에는 pre-defined 되어 있는 값을 사용하면 훨씬 편하게 적용할 수 있습니다. (pre-defined 되어있는 값들은 여기서 확인 가능 합니다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
PUT _ingest/pipeline/ssl_access_log
{
"description": "Ingest pipeline for ssl log",
"processors": [
{
"grok": {
"field": "message",
"ignore_failure": true,
"patterns": [
"""(?:%{IP:xff}|-) %{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] "%{WORD:method} /%{DATA:request_uri} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:byte_response} %{NUMBER:byte_received} %{NUMBER:byte_sent} %{NUMBER:response_time_sec}/%{NUMBER:response_time_milisec}"""
]
}
},
{
"remove": {
"field": "message"
}
}
]
}

** 2018-05-29 추가 **

patterns 에 %{패턴:이름} 으로만 정의한 경우 해당 “패턴”에 걸린 값들을 “이름”으로 인덱싱 합니다.

하지만 인덱스 템플릿과, 맵핑에 integer나 long type으로 정의된 값들도 어떤 이유에서인지 전부 text값으로 들어가는 문제점이 발생하여 다음과 같이
%{패턴:이름:인덱싱타입} 으로 정의하게 수정하자 원하는 타입으로 인덱싱 되었습니다.

1
2
3
"patterns": [
"""(?:%{IP:xff}|-) %{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] "%{WORD:method} /%{DATA:request_uri} HTTP/%{NUMBER:http_version}" %{NUMBER:status:int} %{NUMBER:byte_response:long} %{NUMBER:byte_received:long} %{NUMBER:byte_sent:long} %{NUMBER:response_time_sec:long}/%{NUMBER:response_time_milisec:int}"""
]

# filebeat 설정

pipeline을 만들었으니 이제 filebeat->logstash로 보내지 않고 바로 filebeat->elasticsearch 로 보내도록 설정 해 보겠습니다.
** 일반적으로 elasticsearch 로 보낼때와의 차이점은 pipeline 을 적어 준다는 점 입니다. **

filebeat에서 자동으로 생성해 주는 index가 아닌 우리가 elasticserach에 만들어 놓은 index로 데이터를 보낼때는 setup.template.namesetup.template.pattern도 정의해 줘야 합니다.

** elasticsearch template 생성에 관한 부분은 이 포스트를 참조하세요 **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
filebeat.yml 내용중 일부

filebeat.prospectors:
- type: log
enabled: true
paths:
- /data/logs/apache/*access.log*
exclude_files: ['.gz$']

...

setup.template.name: "ssl_access"
setup.template.pattern: "ssl_access-*"
setup.template.overwrite: false

...
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["${es_host}:${es_port}"]
index: "ssl_access-%{+YYYY.MM.dd}"
pipeline: "ssl_access_log"

# filebeat 실행

1
> nakjunizm@filebeat호스트:/nakjunizm/util/filebeat$ ./filebeat -e -c filebeat.yml -d "publish"

# elasticsearch 에서 확인

오늘 날짜로 생성된 ssl_access_log 인덱스에 데이터가 정상적으로 인덱싱 됐는지 확인 해 봤습니다.

1
GET ssl_access-2018.03.08/_search
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 485,
"max_score": 1,
"hits": [
{
"_index": "ssl_access-2018.03.08",
"_type": "doc",
"_id": "KYbsBWIBGR2P1wb4_g4m",
"_score": 1,
"_source": {
"response_time_sec": "0",
"offset": 28199,
"method": "GET",
"auth": "-",
"ident": "-",
"prospector": {
"type": "log"
},
"http_version": "1.1",
"source": "/data/logs/apache/ssl_access.log.20180308",
"apache_timestamp": "08/Mar/2018:14:03:54 +0000",
"request_uri": "/lib/mysite.js",
"byte_response": "3431",
"byte_sent": "4212",
"@timestamp": "2018-03-08T14:03:59.418Z",
"byte_received": "3061",
"response_time_milisec": "1316",
"beat": {
"hostname": "host",
"name": "hostname",
"version": "6.2.2"
},
"client_ip": "myip",
"status": "200"
}
},

......

정상적으로 들어와 있는것을 확인 할 수 있습니다.

Share