ELK(+beats)를 이용한 apache log visualization

file beats+logstash+elasticsearch+kibana를 이용하여
apache 로그에서 의미있는 데이터를 시각화 하는 방법을 알아보겠습니다.

# 설치

Elastic stack 설치는 대부분 압축 해제하고 yml 로 된 설정파일을 수정하는것으로 끝납니다.
Elastic Search의 경우 OS 셋팅변경이 필요할 수도 있으니 여기를 참조하세요.

# File Beat 설정

1
2
3
4
5
6
7
8
9
10
filebeat.prospectors:
- type: log
enabled: true
paths:
- /nakjunizm/logs/platform/apache/*access.log*
exclude_files: ['.gz$']
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["logstash호스트:5043"]

filebeat의 prospector는 수집기 각각을 나타냅니다. 설정 이름은 상당히 직관적 입니다.
위 설정대로 저는 /nakjunizm/logs/platform/apache/access.log 를 수집하는데 .gz 파일은 수집하지 않을것 입니다.

# Logstash 설정 part1

일단 beats input을 받아서 stdout으로 출력되는 간단한 설정파일 작성해 보겠습니다.

logstash home 에 filebeat-pipeline 이라는 설정파일을 하나 만들어서 아래 값을 넣습니다.

1
nakjunizm@logstash호스트:/nakjunizm/util/logstash$ vi filebeat-pipeline.conf
1
2
3
4
5
6
7
8
9
10
input {
beats {
port => "5043"
}
}
#filter {
#}
output {
stdout { codec => rubydebug }
}

기본적으로 logstash의 설정파일은 위의 구조를 가지고 있습니다
input : input source로 db, file, beats 등등 많은 형식을 source로 가질 수 있습니다. 여기로 가보시면 엄청나게 많은 input plugin이 있는것을 확인하실수 있습니다.

  • filter : logstash의 강력한 기능중의 하나 입니다. input으로 들어온 데이터를 aggregate, mutate, grok 등 각각의 이벤트에 수정을 가할 수 있습니다.
  • output : filter까지 마친 데이터를 output plugin을 통해 보낼 수 있습니다. 제 목표는 Elastic Search 로 보내는 것 입니다. 지금은 확인을 위해 console에 결과를 출력하도록 stdout 플러그인을 사용하겠습니다.

# beats-logstash 정상동작 확인

위와같이 간단히 설정을 마치고 난 뒤 정상적으로 beat에서 데이터를 보내주는지 확인해 봅시다.
저는 logstash와 filebeat를 각각 다른 호스트에 설치하였고 아래와깉이 logstash를 구동시켜서 정상적으로 떴는지 확인 한 후 filebeat호스트에서 filebeat를 실행 시켰습니다.

1
2
nakjunizm@logstash호스트:/nakjunizm/util/logstash$ bin/logstash -f filebeat-pipeline.conf
nakjunizm@filebeat호스트:/nakjunizm/util/filebeat$ ./filebeat -e -c filebeat.yml -d "publish"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"beat" => {
"hostname" => "filebeat호스트",
"version" => "6.1.0",
"name" => "filebeat호스트"
},
"host" => "filebeat호스트",
"message" => "- some.remote.ip.19 - - [22/Dec/2017:10:57:14 +0000] \"POST /api1/v1/auth/token HTTP/1.1\" 200 104 4742 5189 0/14987",
"@version" => "1",
"offset" => 520032,
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"@timestamp" => 2017-12-26T02:17:56.028Z,
"prospector" => {
"type" => "log"
},
"source" => "/nakjunizm/logs/platform/apache/ssl_access.log.20171222"

정상적으로 출력되는것을 확인할 수 있습니다

# Logstash 설정 part2

** filebeat ** 와 ** logstash ** 가 정상적으로 연결되었고 동작하는것을 확인 했으니 이제 filter를 추가해서 로그파일과 ES index mapping 간의 관계를 설정 해 주겠습니다.
위의 결과에서 보면 message에 로그 한줄이 그대로 들어가 있는걸 확인할 수 있는데 그 message를 각각의 필드로 쪼개서 맵핑하는 작업입니다.
%{패턴:맵핑} 구조로 되어 있고 대부분의 로그는 패턴으로 이미 정의되어 있으니 링크를 확인 해 보시고 혹시라도 내가 직접 만든 어플리케이션 로그라서 패턴이 전혀 정의되어 있지 않는 경우에는 regex로 직접 정의할 수도 있습니다.

1
nakjunizm@logstash호스트:/nakjunizm/util/logstash$ vi filebeat-pipeline.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
input {
beats {
port => "5043"
}
}
filter {
grok {
match => [ "message", '(?:%{IP:xff}|-) %{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} /%{DATA:request_uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:status} %{NUMBER:byte_response} %{NUMBER:byte_received} %{NUMBER:byte_sent} %{NUMBER:response_time_sec}/%{NUMBER:response_time_milisec}' ]
}
}
output {
stdout { codec => rubydebug }
}

https://grokdebug.herokuapp.com 에서 한단어씩 끊어가면서 제대로 체크 되는지 확인하는데, grokdebug 홈페이지에서 더블쿼테이션은 이스케이프 해줘야(") 정상적으로 동작하니 주의하세요!
** 확인을 마치고 실제로 filter에 적용할 때는 내용안에 더블쿼테이션이 있으므로 싱글쿼테이션으로 감싸줘 합니다.**
저는 apache로그에 x-forwarded-for라던지 응답시간 초/밀리초 등을 추가해서 위와같은 삽질을 좀 했는데,
** default apache로그 설정을 사용하는 시스템 ** 에서는

  1. 간단히 아래와 같이 코어 패턴으로 처리하거나
    (%{COMBINEDAPACHELOG}등과 같이 미리 정의된 코어 패턴들은 여기에서 확인 가능합니다.)
    1
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  2. filebeat에서 logstash로 보내는 과정을 생략하고!! ** filebeat의 apache2모듈 ** 을 사용해서 바로 elasticsearch로 로그데이터를 보낼수도 있으니 아래 링크들을 참조해 보시면 좋을것 같습니다. 심지어 filebeat의 apache2모듈은 kibana에 기본적인 시각화 차트들과 대쉬보드까지 제공해주는 강력한 모듈 입니다. apache2이외에도 많은 filebeat 모듈들이 존재하니 확인해보세요!!
    https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-module-apache2.html

# filter 가 잘 적용 됐는지 확인

1
nakjunizm@logstash호스트:/nakjunizm/util/logstash-6.1.1$ bin/logstash -f filebeat-pipeline.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
"ident" => "-",
"request_uri" => "api2/v1/notification?type=UNREAD",
"byte_received" => "2100",
"byte_response" => "55",
"offset" => 222231,
"method" => "GET",
"http_version" => "1.1",
"source" => "/nakjunizm/logs/platform/apache/ssl_access.log.20171218",
"@timestamp" => 2017-12-26T08:21:15.686Z,
"auth" => "-",
"prospector" => {
"type" => "log"
},
"response_time_milisec" => "13303",
"response_time_sec" => "0",
"beat" => {
"name" => "filebeat호스트",
"version" => "6.1.0",
"hostname" => "filebeat호스트"
},
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"apache_timestamp" => "18/Dec/2017:06:40:18 +0000",
"host" => "filebeat호스트",
"byte_sent" => "885",
"message" => "- some.remote.ip.171 - - [18/Dec/2017:06:40:18 +0000] \"GET /api2/v1/notification?type=UNREAD HTTP/1.1\" 200 55 2100 885 0/13303",
"status" => "200",
"@version" => "1",
"client_ip" => "some.remote.ip.171"
}

filter를 설정하기 전에 message에 모든 내용이 다 들어오던것과는 달리, message 의 내용을 각각 정의한대로 잘 쪼개서 출력해 주는것을 확인 할 수 있습니다.

# ElasticSearch 로 보내도록 output plugin 설정

1
nakjunizm@logstash호스트:/nakjunizm/util/logstash$ vi filebeat-pipeline.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {
beats {
port => "5043"
}
}
filter {
grok {
match => [ "message", '(?:%{IP:xff}|-) %{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} /%{DATA:request_uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:status} %{NUMBER:byte_response} %{NUMBER:byte_received} %{NUMBER:byte_sent} %{NUMBER:response_time_sec}/%{NUMBER:response_time_milisec}' ]
remove_field => [ message ]
}
}
output {
elasticsearch {
hosts => "elasticsearch호스트:9200
}
}

message 필드는 더이상 필요없으니 remove_field 했고, output은 elasticsearch로 보내도록 설정했습니다.
이 설정 그대로 실행시켜보면 elasticsearch 에 logstash-* 로 자동으로 인덱스가 생성 되고 데이터가 잘 들어가는것을 확인 할 수 있습니다.

# ElasticSearch Template 생성

자동으로 생성된 logstash-* 인덱스를 살펴보면 불필요하게 대부분의 필드를 인덱싱 하는것을 확인할 수 있습니다. 키워드 검색의 용도로 저장하는것이 아니기 때문에 각각의 필드를 형태소 분석기에 돌릴 필요는 없으니 template을 생성해 보겠습니다.

# kibana의 devtool에서 현재 mapping 상태 확인

1
GET logstash-2017.12.26/_mappings
1
2
3
4
5
6
7
8
9
10
11
12
....
"apache_timestamp": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
....

logstash-yyyy.MM.dd index에 doc type과 _default type 두가지의 type이 생성되어 있는것을 확인할 수 있고,
대부분의 필드는 위와같은 형식으로 되어있음을 확인할 수 있습니다.
doc type에 있는값 들 중에서 제가 분석하고 싶은 field들을 먼저 뽑아보면

  • request_uri
  • client_ip
  • response_time_sec/response_time_milisec
  • status
  • host

정도가 있을것 같은데. 위 필드를 제외한 나머지 필드는 인덱싱 하지 않거나
keyword 검색정도만 가능하게 설정 하겠습니다.

ES2.X 에서는 아래와같이 “type” : “string”, “index” : “not_analyzed” 를 이용하였지만
5.X 이상부터는 “type” : “keyword”, “index” : true 를 통해 설정 가능합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ES2.x
{
"foo": {
"type" : "string",
"index": "not_analyzed"
}
}

ES5.x 이상
{
"foo": {
"type": "keyword",
"index": true
}
}

위 조건을 모두 충족시키는 템플릿을 아래와 같이 만들어 봤습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
{
"order": 0,
"index_patterns": "ssl_access-*",
"settings": {
"index": {
"analysis": {
"filter": {},
"analyzer": {}
}
}
},
"mappings": {
"doc": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword"
},
"apache_timestamp": {
"type": "date",
"format": "dd/MMM/yyyy:HH:mm:ss Z"
},
"auth": {
"type": "keyword",
"index": true
},
"beat": {
"properties": {
"hostname": {
"type": "keyword",
"index": true
},
"name": {
"type": "keyword",
"index": true
},
"version": {
"type": "keyword",
"index": true
}
}
},
"byte_received": {
"type": "integer"
},
"byte_response": {
"type": "integer"
},
"byte_sent": {
"type": "integer"
},
"client_ip": {
"type": "ip"
},
"geoip": {
"dynamic": "true",
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"host": {
"type": "keyword",
"index": true
},
"http_version": {
"type": "half_float"
},
"ident": {
"type": "keyword",
"index": true
},
"message": {
"type": "keyword",
"index": false
},
"method": {
"type": "keyword",
"index": true
},
"offset": {
"type": "long"
},
"prospector": {
"properties": {
"type": {
"type": "keyword",
"index": true
}
}
},
"request_uri": {
"type": "keyword",
"index": true
},
"response_time_milisec": {
"type": "integer"
},
"response_time_sec": {
"type": "integer"
},
"source": {
"type": "keyword",
"index": true
},
"status": {
"type": "integer"
},
"tags": {
"type": "text",
"norms": false,
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"xff": {
"type": "ip"
}
}
}
}
}

# 작성한 템플릿 ES에 등록

위에서 작성한 json을 file로 만들어서 아래와 같이 실행해도 되지만 (ssl_access 라는 이름으로 템플릿 생성)

1
curl -XPUT elastic호스트:포트/_template/ssl_access -d@ssl_access.json

저는 일단 kibana의 DevTools에서 직접 돌렸습니다.

1
2
3
4
5
{
"order": 0,
"index_patterns": "ssl_access-*",
.... 위에 작성한 내용과 동일 ...
}

# logstash 설정 part3

이제 모든 준비를 완료 했으니 logstash의 설정을 마지막으로 바꿔보겠습니다.

1
nakjunizm@logstash호스트:/nakjunizm/util/logstash$ vi filebeat-pipeline.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
input {
beats {
port => "5043"
}
}
filter {
grok {
match => [ "message", '(?:%{IP:xff}|-) %{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} /%{DATA:request_uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:status} %{NUMBER:byte_response} %{NUMBER:byte_received} %{NUMBER:byte_sent} %{NUMBER:response_time_sec}/%{NUMBER:response_time_milisec}' ]
remove_field => [ message ]
}
}
output {
elasticsearch {
hosts => "elasticsearch호스트:9200"
index => "ssl_access-%{+YYYY.MM.dd}"
}
}

# 잘 동작하는지 확인

앞의 모든 설정을 마친 상태로 logstash를 구동하고, filebeat의 data폴더아래있는 데이터를 삭제한 후 (offset정보등이 들어있는)
filebeat재구동해서 ElasticSeach에 데이터를 넣고 잘 동작하는지 확인 합니다.

1
GET ssl_access-2017.12.26/_search
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 25259,
"max_score": 1,
"hits": [
{
"_index": "ssl_access-2017.12.27",
"_type": "doc",
"_id": "6AorlmAB1NCOl3BsnBUx",
"_score": 1,
"_source": {
"method": "POST",
"ident": "-",
"apache_timestamp": "04/Dec/2017:03:18:45 +0000",
"byte_received": "4710",
"response_time_sec": "0",
"client_ip": "some.remote.ip.19",
"beat": {
"version": "6.1.0",
"name": "filebeat호스트",
"hostname": "filebeat호스트"
},
"response_time_milisec": "3457",
"byte_response": "66",
"status": "200",
"source": "/nakjunizm/logs/platform/apache/ssl_access.log.20171204",
"byte_sent": "4996",
"prospector": {
"type": "log"
},
"@version": "1",
"http_version": "1.1",
"offset": 61270,
"tags": [
"beats_input_codec_plain_applied"
],
"request_uri": "api1/v1/auth/token",
"@timestamp": "2017-12-27T04:12:07.575Z",
"auth": "-",
"host": "filebeat호스트"
}
},
.....

# Kibana 에서 시각화 하기

ElasticSearch에 내가 원하는 형태로 데이터가 들어오고 있다면 kibana를 이용해서 데이터를 시각화 하는 부분은 상대적으로 쉽습니다. 저는 Y-Axis에 response_time_sec의 평균과 최대값을 넣고, X-Axis에 request_uri를 넣어서 그래프를 그려보니
request_uri별로 응답속도의 평균값과 최대값이 나와서 평균대비 max가 높은 uri들은 어떤게 있는지 한눈에 알아볼 수 있는 그래프를 만들어 봤습니다.
이 부분은 각자의 데이터와 분석하려는 목적에 따라 판이하게 달라질 수 있으니 kibana 홈페이지에서 참조하세요
https://www.elastic.co/guide/en/kibana/current/index.html

Share