Logstash + Kafka(输入源)+ Elasticsearch

文章目录
[隐藏]

相关文档 https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/get_start/introduction.html

1.安装Logstash

下载
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.0.tar.gz

解压
tar -zxvf logstash-7.3.0.tar.gz
mv logstash-7.3.0 /usr/local/bin/logstash

2.配置Kafka

2.1. 安装kafka input/output插件
input插件
bin/logstash-plugin install logstash-input-kafka

Validating logstash-input-kafka
Installing logstash-input-kafka
Installation successful

output插件
bin/logstash-plugin install logstash-output-kafka

Validating logstash-output-kafka
Installing logstash-output-kafka
Installation successful
2.2. 配置管道(kafka设置为输入源 )
新建first-pipeline.conf文件
vim first-pipeline.conf

input {
        kafka {
                bootstrap_servers => "localhost:9092" //kafka服务器ip和端口
                topics => "test" //主题名称
                group_id => "ttt"
        }
}
filter {
}

output {
        elasticsearch {
                hosts => "localhost:9200" //elasticsearch服务器和端口
                index => "index" //索引
        }
        stdout { codec => rubydebug } // 代码显示美化
}

3.验证配置

bin/logstash -f first-pipeline.conf --config.test_and_exit

4.启动Logstash

bin/logstash -f first-pipeline.conf --config.reload.automatic

5.Logstash 过滤转化

验证:
http://grokdebug.herokuapp.com/

filter {
        json {
                source => "message" // message转换成JSON格式
        }
        grok {
                match => { "message" => "%{HOSTNAME:logserver} %{PATH:logpath} %{NGINX_ACCESS_LOG}"} //过滤数据映射到对应字段
                remove_field  => "message" // 删除字段
                add_field => {
                    "name" => "xxx" // 添加字段
                }
        }
        geoip {
                source => "clientip"
        }
}

6.Logstash添加patterns

在/usr/local/bin/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns下添加或者修改

7.管道配置例子

日志文件 /var/log/nginx/access.log
192.168.1.176 - - [21/Aug/2019:22:27:00 +0800] "GET / HTTP/1.1" 500 5 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36" "-"
logstah转换前Message数据:
"message" : "192.168.1.176 - - [21/Aug/2019:22:50:40 +0800] \"GET / HTTP/1.1\" 500 5 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36\" \"-\"",

管道配置:first-pipeline.conf
input {
    kafka {
        bootstrap_servers => "localhost:9092"
            topics => "test"
            group_id => "ttt"
            type => "nginx-access-log"
    }
}
filter {

  json {
         source => "message"
    }
  if [type] == "nginx-access-log" {
    grok {
      match => { "message" => ["(?<RemoteIP>(\d*.\d*.\d*.\d*)) - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
      remove_field  => "message"
      remove_field  => "beat"
      remove_field  => "host"
      remove_field  => "source"
      remove_field  => "prospector"
      remove_field  => "log"
      remove_field  => "input"
      remove_field  => "@version"
      remove_field  => "offset"
      remove_field  => "type"
      remove_field  => "@timestamp"
    }
  }
}
output {
    elasticsearch {
            hosts => "localhost:9200"
            index => "index"
    }
    stdout { codec => rubydebug }
}


效果:
{
    "RemoteIP" => "192.168.1.176",
    "nginx" => {
        access" => {
            "url" => "/",
                "body_sent" => {
                    "bytes" => "5"
                },
            "time" => "22/Aug/2019:18:13:36 +0800",
            "method" => "GET",
            "response_code" => "500",
            "agent" => "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36",
            "user_name" => "-",
            "referrer" => "-",
            "http_version" => "1.1"
        }
    }
}
262 人浏览过

发表评论

邮箱地址不会被公开。 必填项已用*标注