ELK-Logstash

Run Logstash on Docker

docker pull docker.elastic.co/logstash/logstash-oss:7.9.1
docker run --rm -it \
    -p 5044:5044 \
    -v /data/yzchen/elk/logstash/pipeline/:/usr/share/logstash/pipeline/ \
    docker.elastic.co/logstash/logstash-oss:7.9.1

Logstash Config

input {
# Filebeat input
beats {
  port => "5044"
}

# Redis input
redis {
  host => "host_ip"
  port => port
  password => "password"
  data_type => "channel"
  key => "channel_pattern"
}

filter {
# drop data
if [type] != "raw" {
  drop { }
}

# parse json string
json {
  source => "msgdata"
}

# drop field
prune {
  blacklist_names => ["msgdata"]
}

# check if a field exists
if ([foo]) { } # for numeric type
if ("" in [foo]) { } # for other type

# dissect
dissect {
  mapping => {
    "line" =>  "%{month} %{day} %{time} %{id} %{?NET_CONNTRACK->}[%{type}]%{rest}"
  }
}

# select from array and dict
mutate {
  update => {"new_field" => "%{[array_name][0][key_name]}"}
}

# qeury mysql and add filed
jdbc_streaming {
  jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.34.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://host:3306/database"
  jdbc_user => "username"
  jdbc_password => "secret"
  statement => "SELECT col FROM table WHERE name = :name"
  parameters => { "name" => "name"}
  target => "col"
}

# query elasticsearch and add filed
# 注意这里的query语法是ES的Query string
# 详见https://www.elastic.co/guide/en/elasticsearch/reference/7.12/query-dsl-query-string-query.html#query-string-syntax
elasticsearch {
  hosts => ["host:9200"]
  index => "python-log-auth-*"
  query => 'type:auth AND name:"%{[name]}"'
  fields => { "device" => "device" }
  sort => "@timestamp:desc"
}

# ruby
ruby {
  code => "event.set('date', (event.get('message')[0..-7].to_f + event.get('message')[-6..-1].to_f / 1000000).to_s)"
}
}

output {
# debug output
stdout { codec => rubydebug }

监控

curl -XGET 'localhost:9600/_node/stats/<ps|jvm|process|events|pipelines|reloads>'

References