简介
logstash的主要功能就是转换数据。
官方将一些主流的插件分为了四类并进行了介绍。
https://www.elastic.co/guide/en/logstash/7.15/transformation.html
核心操作
ℹ️ 以下各种插件并非全部功能,具体可以查看官方文档.
date filter
filter {
date {
match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
}
}
按照MMM dd yyyy HH:mm:ss
格式化logdate
事件字段,并将格式化后的时间作为事件的【logstash】时间戳。通常用来为es索引添加真实的事件时间戳.
drop filter
filter {
if [loglevel] == "debug" {
drop { }
}
}
通过if条件删除包含loglevel: debug
字段的事件.
fingerprint filter
filter {
fingerprint {
source => ["IP", "@timestamp", "message"]
method => "SHA1"
key => "0123"
target => "[@metadata][generated_id]"
}
}
通过事件原始字段加额外的key,以method
加密方式构成[@metadata][generated_id]
唯一ID。
mutate filter
filter {
mutate {
rename => { "HOSTORIP" => "client_ip" }
}
}
针对字段执行rename, remove, replace, modify,strip等操作行为.
ruby filter
filter {
ruby {
code => "event.cancel if rand <= 0.90"
}
}
执行ruby代码,例子中的意思是:取消90%的人.
序列操作
csv filter
filter {
csv {
separator => ","
columns => [ "Transaction Number", "Date", "Description", "Amount Debit", "Amount Credit", "Balance" ]
}
}
将csv数据,按照separator
进行分割,且字段名通过columns
定义
xml filter
filter {
xml {
source => "message"
}
}
解析事件中的message
字段(message字段存储的是xml文档)
json coder
input {
file {
path => "/path/to/myfile.json"
codec =>"json"
}
coder编解码器一般用于input或者output,还有protobuf/fluent/avro等。
提取字段、整理数据
dissect filter
日志数据
Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...
filter {
dissect {
mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" }
}
}
分组映射message
,新的事件字段如下:
{
"msg" => "Starting system activity accounting tool...",
"@timestamp" => 2017-04-26T19:33:39.257Z,
"src" => "localhost",
"@version" => "1",
"host" => "localhost.localdomain",
"pid" => "1",
"message" => "Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...",
"type" => "stdin",
"prog" => "systemd",
"ts" => "Apr 26 12:20:02"
}
kv filter
日志数据:
ip=1.2.3.4 error=REFUSED
配置:
filter {
kv { }
}
新事件字段:
ip: 1.2.3.4
error: REFUSED
grok filter
类似于 dissect filter。
日志数据:
55.3.244.1 GET /index.html 15824 0.043
配置:
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
新事件字段:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
调试工具1: http://grokdebug.herokuapp.com/patterns,并且里面也包含一些已有的示例。
调试工具2:通过kibana-开发工具-Grok Debugger来调试
丰富数据
dns filter
将字段ip进行反差,并执行后续动作,例如replace,用反查的域名替换掉ip
filter {
dns {
reverse => [ "source_host" ]
action => "replace"
}
}
geoip filter
通过对ip进行地理位置查询,添加一个额外的geoip
字段
filter {
geoip {
source => "clientip"
}
}
jdbc_static filter
通过外部数据库的数据构建本地缓存库,并通过本地缓存库查询数据丰富事件
filter {
jdbc_static {
loaders => [
{
id => "remote-servers"
query => "select ip, descr from ref.local_ips order by ip"
local_table => "servers"
},
{
id => "remote-users"
query => "select firstname, lastname, userid from ref.local_users order by userid"
local_table => "users"
}
]
local_db_objects => [
{
name => "servers"
index_columns => ["ip"]
columns => [
["ip", "varchar(15)"],
["descr", "varchar(255)"]
]
},
{
name => "users"
index_columns => ["userid"]
columns => [
["firstname", "varchar(255)"],
["lastname", "varchar(255)"],
["userid", "int"]
]
}
]
local_lookups => [
{
id => "local-servers"
query => "select descr as description from servers WHERE ip = :ip"
parameters => {ip => "[from_ip]"}
target => "server"
},
{
id => "local-users"
query => "select firstname, lastname from users WHERE userid = :id"
parameters => {id => "[loggedin_userid]"}
target => "user"
}
]
# using add_field here to add & rename values to the event root
add_field => { server_name => "%{[server][0][description]}" }
add_field => { user_firstname => "%{[user][0][firstname]}" }
add_field => { user_lastname => "%{[user][0][lastname]}" }
remove_field => ["server", "user"]
jdbc_user => "logstash"
jdbc_password => "example"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar"
jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2"
}
}
loaders
查询外部库
local_db_objects
映射loaders
的库,构建本地表
local_lookups
通过本地表查询数据,存放于target
指定的变量
add_field
添加新字段
remove_field
删除老字段
translate filter
针对某个事件字段翻译,需要提供字典。
filter {
translate {
field => "response_code"
target => "http_response"
dictionary => {
"200" => "OK"
"403" => "Forbidden"
"404" => "Not Found"
"408" => "Request Timeout"
}
remove_field => "response_code"
}
}
field
指定原字段
target
指定翻译后的字段
dictionary
指定翻译字典.左边是原字段,右边是翻译后字段
useragent filter
将agent字符串转化成kv对
filter {
useragent {
source => "agent"
target => "user_agent"
remove_field => "agent"
}
}
事件的新字段:
"user_agent": {
"os": "Mac OS X 10.12",
"major": "50",
"minor": "0",
"os_minor": "12",
"os_major": "10",
"name": "Firefox",
"os_name": "Mac OS X",
"device": "Other"
}