ELK-Logstash#
Run Logstash on Docker#
docker pull docker.elastic.co/logstash/logstash-oss:7.9.1
docker run --rm -it \
-p 5044:5044 \
-v /data/yzchen/elk/logstash/pipeline/:/usr/share/logstash/pipeline/ \
docker.elastic.co/logstash/logstash-oss:7.9.1
Logstash Config#
input {
# Filebeat input
beats {
port => "5044"
}
# Redis input
redis {
host => "host_ip"
port => port
password => "password"
data_type => "channel"
key => "channel_pattern"
}
filter {
# drop data
if [type] != "raw" {
drop { }
}
# parse json string
json {
source => "msgdata"
}
# drop field
prune {
blacklist_names => ["msgdata"]
}
# check if a field exists
if ([foo]) { } # for numeric type
if ("" in [foo]) { } # for other type
# dissect
dissect {
mapping => {
"line" => "%{month} %{day} %{time} %{id} %{?NET_CONNTRACK->}[%{type}]%{rest}"
}
}
# select from array and dict
mutate {
update => {"new_field" => "%{[array_name][0][key_name]}"}
}
# qeury mysql and add filed
jdbc_streaming {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.34.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://host:3306/database"
jdbc_user => "username"
jdbc_password => "secret"
statement => "SELECT col FROM table WHERE name = :name"
parameters => { "name" => "name"}
target => "col"
}
# query elasticsearch and add filed
# 注意这里的query语法是ES的Query string
# 详见https://www.elastic.co/guide/en/elasticsearch/reference/7.12/query-dsl-query-string-query.html#query-string-syntax
elasticsearch {
hosts => ["host:9200"]
index => "python-log-auth-*"
query => 'type:auth AND name:"%{[name]}"'
fields => { "device" => "device" }
sort => "@timestamp:desc"
}
# ruby
ruby {
code => "event.set('date', (event.get('message')[0..-7].to_f + event.get('message')[-6..-1].to_f / 1000000).to_s)"
}
}
output {
# debug output
stdout { codec => rubydebug }
curl -XGET 'localhost:9600/_node/stats/<ps|jvm|process|events|pipelines|reloads>'
References#