본문 바로가기

카테고리 없음

logstash, File Input plugin 사용 예

Elastic Stack 으로 간단한 로그 모니터링 기능 구성해서 담당자들에게 보여주고 의견 들어보니 로그 지속적인 수집 및 시각화는 불필요하고 분석 대상 로그를 파일 단위로 실시간 파싱, Search API 학습 없이 장애 원인 추적 및 분석할 수 있는 기능 등이 우선 필요하다는군요.

요구사항만 놓고 보면 정규식과 (*)nix 유틸 조합한 쉘스크립트로 간단하게 처리 가능해보이긴 하지만 Elastic Stack 도입을 위한 포석을 깔기 위해 Elastic Stack을 가능한 활용해보기로 하고 작업 진행 중입니다.

진행 내용 메모.

Windows 환경이라 batch 파일 이용해서 다음과 같이 구성해보았습니다.

@echo off
set LF=%1
set LOG_FILE=%LF:\=/%

set HOST=ES_IP:ES_PORT
set TMPL=.../ee-log-template.json

del ...\data\plugins\inputs\file\.sincedb*

REM es index 삭제
curl -XDELETE %HOST%/eep-log

start bin\logstash.bat -f config\ee-log.yml

rem timeout 120 > NUL
Powershell.exe -executionpolicy remotesigned -File .\chk_offset.ps1 %LF%

REM logstash 중단
setlocal enableDelayedExpansion
for /f "tokens=1" %%i in ('jps -m ^| find "Logstash"') do ( taskkill /F /PID %%i )
for /f "tokens=1,2,* delims=," %%a in ('tasklist /v /fo csv ^| findstr "logstash-eelog"') do ( if "%%~a"=="cmd.exe" taskkill /pid "%%~b" )

 

  1. 배치 파일 실행 패러미터로 로그 파일 위치 넘겨 받을 때 경로 구분자를 "\" 에서 "/"로 변경.
  2. 이전 분석 데이타(index) 삭제.
  3. logstash 실행하여 로그 파일 파싱 처리.
  4. 파워쉘 스크립트로 logstash 에서 로그 파일 다 읽어들였는지 검사.
  5. 다 읽어들였다면 logstash 중단시키고 해당 팝업창 닫기.


지속적인 로그 수집이 필요없으므로 FileBeat 사용하지 않고 3 단계에서 Logstash File Input plugin 을 이용하여 직접 로그 데이타 ES 에 전달하도록 했습니다. 더하여 작업 완료 후 logstash 를 중단시키려고 했는데 이 부분이 조금 골치 아팠습니다.

logstash File Input plugin 에서 파일 읽기가 끝났는지 알아낼 수 있는 방법이 현재는 명확하게 제공되지 않는 듯 합니다. 스택오버플로우 검색해보니 sincedb 이용했다는 답글이 있네요.
sincedb 에 기록되는 정보 중 4번째 컬럼이 현재 읽고 있는 파일의 byte offset 입니다. 배치 파일로 파일 크기 구하는 방법이 있길래 테스트 해봤는데 sincedb 의 offset 값과 이 값이 일치하더군요.

for %%A in (%file%) do SET fbytes=%%~zA
echo %fbytes% bytes

sincedb offset 값과 대상 로그 파일 크기 비교하면 가능하겠다는건 확인했으니 구현해봐야죠. 

$log_file = $args[0]
Write-Host $log_file
$log_len = Get-Childitem $log_file | Select length
$log_len_size = $log_len.Length
$sdb_dir = "...\data\plugins\inputs\file"

while (!(Test-Path "$sdb_dir\.sincedb*")) { Start-Sleep 10 }
$offset = 0
Get-ChildItem $sdb_dir | select FullName | % {
    Do {
        Get-Content $_.FullName | ForEach-Object {
            $offset = $_.split(" ")[3]
        }

        sleep 5
        Write-Host "Size $offset - $log_len_size"
    }
    while ($offset -lt $log_len_size)
}

Write-Host "END $offset - $log_len_size"

PowerShell 스크립트입니다.

.sincedb*** 파일이 생성되지 않으면 10초 대기하도록 했고, 생성된 후에는 5초 간격으로 로그 파일과 offset 값을 비교하여 offset 값이 로그 파일 이상이 되면 끝나도록 했습니다. 실행 끝나면 파워쉘을 호출한 배치 스크립트에서 그 다음 로직(logstash 중단, 창닫기)을 실행하겠죠/

아래는 logstash 설정 파일입니다. 분석할 로그 파일을 동적으로 받아 처리할 수 있도록 했고 현재는 offset 필요 없으므로 sincedb 는 사용 안하도록 해놓았습니다. 

ee-log.yml 

input { 
  file  { 
    path => "${LOG_FILE}" 
    type => "ee-log" 
    codec => multiline { 
      pattern => "^[0-9]{4}-[0-9]{2}-[0-9]{2}" 
      what => "previous" 
      negate=> true 
    } 
    sincedb_path => "NUL" 
    start_position => "beginning" 
  } 
} 



filter { 
    grok { 
        match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}\s?(\[(?.+)\])? \[%{LOGLEVEL:loglevel}\s?\]\s?(\[%{WORD:logcat}\])?\s?(%{SYSLOGPROG:prg})? - ((((?[^{].*),\s?)?(?{.*))|(?.*))" } 
    } 

    if [jsonmsg] { 
        json { 
            source => "jsonmsg" 
            target => "jsonmsg" 
        } 
        json { 
            source => "[jsonmsg][msgJson]" 
            target => "[jsonmsg][msgJson]" 
        } 
    } 

    date { 
        match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ] 
        remove_field => ["timestamp"] 
    } 

} 

output { 
    elasticsearch { 
        hosts => ["${HOST}"] 
        index => "eep-log" 
        template => "${TMPL}" 
        template_overwrite => true 
    } 

    stdout { codec => rubydebug } 
}


로그 데이터 수집 처리는 어느 정도 해결되었고 담당자가 Search API 학습 없이 사용할 수 있도록 클라이언트 프로그램을 만드는 중인데 우선 로그 데이터가 제대로 들어갔는지 확인해볼 겸 elasticsearch_dsl 이용한 코드 간단히 작성해보았습니다. 

match 와 match_phrase, 각각 사용해보았는데 결과는 잘 나오네요.  

코딩 중에 필드명 지정해주는거로 시간 좀 허비했는데 json 데이터 등을 사용할 때 생기는 "." 들어간 필드명은 "__"(언더바 두 개)를 사용하면 됩니다. 

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, Q

def es_query():
    es = Elasticsearch(hosts="...", port=9200)
    idx = "eep-log",
    s = Search(index=idx, using=es)
    # q = Q("match", jsonmsg__msgJson__mchnId='T768')
    q = Q("match_phrase", logstr='ContainerException')
    log_msgs = s.query(q)

    for result in log_msgs:
        print(result.message)