日志☞03ELK基本使用-logstash组件02-插件介绍

阅读量: zyh 2019-12-15 14:30:30
Categories: > Tags:

简介

logstash的主要功能就是转换数据。

官方将一些主流的插件分为了四类并进行了介绍。

https://www.elastic.co/guide/en/logstash/7.15/transformation.html

核心操作

ℹ️ 以下各种插件并非全部功能,具体可以查看官方文档.

date filter

filter {
  date {
    match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
  }
}

按照MMM dd yyyy HH:mm:ss格式化logdate事件字段,并将格式化后的时间作为事件的【logstash】时间戳。通常用来为es索引添加真实的事件时间戳.

drop filter

filter {
  if [loglevel] == "debug" {
    drop { }
  }
}

通过if条件删除包含loglevel: debug字段的事件.

fingerprint filter

filter {
  fingerprint {
    source => ["IP", "@timestamp", "message"]
    method => "SHA1"
    key => "0123"
    target => "[@metadata][generated_id]"
  }
}

通过事件原始字段加额外的key,以method加密方式构成[@metadata][generated_id]唯一ID。

mutate filter

filter {
  mutate {
    rename => { "HOSTORIP" => "client_ip" }
  }
}

针对字段执行rename, remove, replace, modify,strip等操作行为.

ruby filter

filter {
  ruby {
    code => "event.cancel if rand <= 0.90"
  }
}

执行ruby代码,例子中的意思是:取消90%的人.

序列操作

csv filter

filter {
  csv {
    separator => ","
    columns => [ "Transaction Number", "Date", "Description", "Amount Debit", "Amount Credit", "Balance" ]
  }
}

将csv数据,按照separator进行分割,且字段名通过columns定义

xml filter

filter {
  xml {
    source => "message"
  }
}

解析事件中的message字段(message字段存储的是xml文档)

json coder

input {
  file {
    path => "/path/to/myfile.json"
    codec =>"json"
}

coder编解码器一般用于input或者output,还有protobuf/fluent/avro等。

提取字段、整理数据

dissect filter

日志数据

Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...
filter {
  dissect {
    mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" }
  }
}

分组映射message,新的事件字段如下:

{
  "msg"        => "Starting system activity accounting tool...",
  "@timestamp" => 2017-04-26T19:33:39.257Z,
  "src"        => "localhost",
  "@version"   => "1",
  "host"       => "localhost.localdomain",
  "pid"        => "1",
  "message"    => "Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...",
  "type"       => "stdin",
  "prog"       => "systemd",
  "ts"         => "Apr 26 12:20:02"
}

kv filter

日志数据:

ip=1.2.3.4 error=REFUSED

配置:

filter {
  kv { }
}

新事件字段:

ip: 1.2.3.4
error: REFUSED

grok filter

类似于 dissect filter。

日志数据:

55.3.244.1 GET /index.html 15824 0.043

配置:

filter {
  grok {
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}

新事件字段:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

调试工具1: http://grokdebug.herokuapp.com/patterns,并且里面也包含一些已有的示例。

调试工具2:通过kibana-开发工具-Grok Debugger来调试

image-20211115151242262

丰富数据

dns filter

将字段ip进行反差,并执行后续动作,例如replace,用反查的域名替换掉ip

filter {
  dns {
    reverse => [ "source_host" ]
    action => "replace"
  }
}

geoip filter

通过对ip进行地理位置查询,添加一个额外的geoip字段

filter {
  geoip {
    source => "clientip"
  }
}

jdbc_static filter

通过外部数据库的数据构建本地缓存库,并通过本地缓存库查询数据丰富事件

filter {
  jdbc_static {
    loaders => [ 
      {
        id => "remote-servers"
        query => "select ip, descr from ref.local_ips order by ip"
        local_table => "servers"
      },
      {
        id => "remote-users"
        query => "select firstname, lastname, userid from ref.local_users order by userid"
        local_table => "users"
      }
    ]
    local_db_objects => [ 
      {
        name => "servers"
        index_columns => ["ip"]
        columns => [
          ["ip", "varchar(15)"],
          ["descr", "varchar(255)"]
        ]
      },
      {
        name => "users"
        index_columns => ["userid"]
        columns => [
          ["firstname", "varchar(255)"],
          ["lastname", "varchar(255)"],
          ["userid", "int"]
        ]
      }
    ]
    local_lookups => [ 
      {
        id => "local-servers"
        query => "select descr as description from servers WHERE ip = :ip"
        parameters => {ip => "[from_ip]"}
        target => "server"
      },
      {
        id => "local-users"
        query => "select firstname, lastname from users WHERE userid = :id"
        parameters => {id => "[loggedin_userid]"}
        target => "user" 
      }
    ]
    # using add_field here to add & rename values to the event root
    add_field => { server_name => "%{[server][0][description]}" }
    add_field => { user_firstname => "%{[user][0][firstname]}" } 
    add_field => { user_lastname => "%{[user][0][lastname]}" }
    remove_field => ["server", "user"]
    jdbc_user => "logstash"
    jdbc_password => "example"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar"
    jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2"
  }
}

loaders 查询外部库

local_db_objects 映射loaders的库,构建本地表

local_lookups 通过本地表查询数据,存放于target指定的变量

add_field 添加新字段

remove_field 删除老字段

translate filter

针对某个事件字段翻译,需要提供字典。

filter {
  translate {
    field => "response_code"
    target => "http_response"
    dictionary => {
      "200" => "OK"
      "403" => "Forbidden"
      "404" => "Not Found"
      "408" => "Request Timeout"
    }
    remove_field => "response_code"
  }
}

field 指定原字段

target 指定翻译后的字段

dictionary 指定翻译字典.左边是原字段,右边是翻译后字段

useragent filter

将agent字符串转化成kv对

filter {
  useragent {
    source => "agent"
    target => "user_agent"
    remove_field => "agent"
  }
}

事件的新字段:

        "user_agent": {
          "os": "Mac OS X 10.12",
          "major": "50",
          "minor": "0",
          "os_minor": "12",
          "os_major": "10",
          "name": "Firefox",
          "os_name": "Mac OS X",
          "device": "Other"
        }