一、如何运行 Logstash
在 Mac, Unix 及 Linux下,可以使用如下的方式来进行运行:
bin/logstash [options]
在 Windows 环境下,我们使用如下的方式来运行:
bin/logstash.bat [options]
bin/logstash -e ""
在通常情况下,需要跟上一些选项才可以启动 Logstash,否则它会退出。除非有一种情况,在启动 monitoring
后,可以不添加选项来启动 Logstash。
logstash命令常用参数
-n
指定logstash实例名称,如果没有指定,默认是本地主机名-f
从指定的文件或文件夹中加载logstash的配置;如果是一个文件夹,它会加载里面所有的文件,或者可以加上通配符只加载特定格式的文件-w
允许filter和output的pipeline线程数量,默认是CPU核数-b
每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数,默认是 125 条。越大性能越好,同样也会消耗越多的 JVM 内存-u
每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms-l
指定日志输出位置-r
监控配置配置文件,如果有变化则自动重载logstash-e
使用给定的命令行字符串作为配置,直接运行bin/log/logstash -e 配置默认是"input { stdin { type => stdin } }" "output { stdout { codec => rubydebug } }"-t
检查配置文件是否有语法错误-V
打印logstash版本--log.level
指定日志等级,包括trace、debug、info、warn、error、fatal,默认info--http.host
指定web API绑定的主机,默认127.0.0.1--http.port
指定web API的http端口,默认9600至9700
二、如何配置 Logstash pipeline
Logstash 管道有两个必需元素,输入(inputs)和输出(ouputs),以及一个可选元素 filters。 输入插件使用来自源的数据,过滤器插件在你指定时修改数据,输出插件将数据写入目标。
要测试 Logstash 安装,请运行最基本的 Logstash 管道。 例如:
cd logstash-7.6.2
bin/logstash -e 'input { stdin { } } output { stdout {} }'
三、示例
将使用一个例子来一步一步地详细介绍如何使用 Logstash 来实现数据处理。
1)首先启动 Elasticsearch 及 Kibana。请参照之前的步骤运行 Elasticsearch 及 Kibana。
2)进入到 Logstash 安装目录,并修改 config/logstash.yml
文件。把 config.reload.automatic
设置为 true。(这样设置的好处是,每当修改完配置文件后,不需要每次都退出 Logstash,然后再重新运行。Logstash 会自动侦测到最新的配置文件的变化。)
3)创建一个叫做 weblog.conf
的配置文件,并输入一下的内容:
input {
tcp {
port => 9900
}
}
output {
stdout { }
}
4)运行 Logstash
bin/logstash -f weblog.conf
使用 nc 应用把数据发送到 TCP 端口号 9900,并查看 console 的输出。在另外一个 console 中打入如下的命令:
echo 'hello logstash' | nc localhost 9900
在 Logstash 运行的 console 里可以看到输出:
上面说明 TCP input 运行是正常的。
5)下载 Weblog 文件并发送给 Logstash 。
可以在地址 https://github.com/liu-xiao-guo/beats-getstarted
下载一个叫做 weblog-sample.log
的文件。这个文件有 64.5M 的大下。我们把这个文件保存于 Logstash 的安装目录中。它里面的其中的一个 log 的内容如下:
14.49.42.25 - - [12/May/2019:01:24:44 +0000] "GET /articles/ppp-over-ssh/ HTTP/1.1" 200 18586 "-" "Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.2b1) Gecko/20091014 Firefox/3.6b1 GTB5"
使用如下的命令来读取第一行,并输入到 TCP 9900 端口:
head -n 1 weblog-sample.log | nc localhost 9900
那么在 Logstash 运行的 console 中,可以看到如下的输出:
这显示是第一行的那条 Weblog 信息。在这里,没有对数据进行任何的处理。它只是把第一行日志读出来,并把它都赋予给 message 这个字段。
四、运用过滤器来对数据进行处理
分别使用一些过滤器来对数据进行分别处理。
4.1、Grok
input {
tcp {
port => 9900
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
output {
stdout { }
}
执行:
head -n 1 weblog-sample.log | nc localhost 9900
这个时候,再在 Logstash 运行的 console 里,可以看到:
也就是说,通过 Grok 这个 filter,它通过正则表达式进行匹配,并把输入的非结构化的数据变为一个结构化的数据。从上面,可以看到各种被提取的字段,比如 clientip, port, host 等等。
4.2、Geoip
尽管上面的数据从非结构化变为结构化数据,这是非常好的,但是还是有美中不足的地方。比如 clientip,我们知道了这个请求的 IP 地址,但是我们还是不知道这个 IP 是从哪个地方来的,具体是哪个国家,哪个地理位置。在这个时候,我们需要使用 geoip 过滤器来对数据进行丰富。我们在 filter 的这个部分加入 geoip。
weblog.conf
input {
tcp {
port => 9900
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
}
}
output {
stdout { }
}
同样地,使用如下的命令来发送日志里的第一条数据:
head -n 1 weblog-sample.log | nc localhost 9900
这个时候,我们可以看到除了在上面的 clientip 信息之外, 我们的数据多了一个新的叫做 geoip 的字段。它里面含有 location 位置信息。这就为我们在地图上进行显示这些数据提供了方便。可以利用 Elastic 所提供的地图清楚地查看到请求是来自哪里。
特别注意:针对 Elastic Stack 8.x 发行版,为了 ECS 的兼容性,需要修改上面的 geoip 过滤器如下:
geoip {
source => "[source][address]"
target => "geoip"
}
4.3、Useragent
上面的数据比以前更加丰富。还注意到 agent 这个字段。它非常长,我们没法查看出来是来自什么样的浏览器,什么语言等等信息。可以使用 useragent 这个过滤器来进一步丰富数据。在 weblog.conf 中添加这个过滤器:
weblog.conf
input {
tcp {
port => 9900
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "useragent"
}
}
output {
stdout { }
}
执行:
head -n 1 weblog-sample.log | nc localhost 9900
可以在上面看到一个新增加的字段 useragent。上面它表明了 useragent 的版本信息,浏览器的名称以及操作系统。这对于我们以后的数据分析提供更进一步的帮助。
4.4、Mutate - convert
从上面的输出中,可以看出来 bytes 是一个字符串的类型。这个和我们实际的应用可能会有所不同。这应该是一个整型数。可以使用 mutate: convert 过滤器来对它进行转换。重新编辑 weblog.conf 文件。把它放于 grok 过滤器之后:
input {
tcp {
port => 9900
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
mutate {
convert => {
"bytes" => "integer"
}
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "useragent"
}
}
output {
stdout { }
}
等更新完这个配置文件后,再次在另外一个 console 中发送第一个 log:
head -n 1 weblog-sample.log | nc localhost 9900
从上面的输出中,可以看到 bytes 这个字段已经变为整数值了,而不是之前的字符串了。
4.5、Date
Logstash 将事件时间存储在 @timestamp 字段中。 但是实际的日志创建时间在 timestamp 字段中(没有@)。 该字段的格式不是 ISO8601,因此存储为文本。可以使用 date 过滤器将此字段转换为日期类型。我们编辑 weblog.conf,并加入 date 过滤器:
input {
tcp {
port => 9900
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
mutate {
convert => {
"bytes" => "integer"
}
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "useragent"
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output {
stdout { }
}
等更新完这个配置文件后,我们再次在另外一个 console 中发送第一个 log:
head -n 1 weblog-sample.log | nc localhost 9900
从上面,看出来新添加了一个叫做 @timestamp 的字段。
五、设置输出 - Elasticsearch
所有的到目前为止,所有的输出都是 stdout,也就是输出到 Logstash 运行的 console。想把处理后的数据输出到 Elasticsearch。在 output 的部分添加如下的 Elasticsearch 输出:
input {
tcp {
port => 9900
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
mutate {
convert => {
"bytes" => "integer"
}
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "useragent"
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output {
stdout { }
elasticsearch {
hosts => ["localhost:9200"]
user => "elastic"
password => "changeme"
}
}
当 Logstash 通过 elasticsearch 输出插件向 Elasticsearch 发送数据时,如果目标索引不存在,Elasticsearch 会根据提供的数据自动创建索引。
Logstash 默认会使用 logstash-%{+YYYY.MM.dd}
作为索引名称,即按照当前日期生成新的索引。
在上面,同时保留两个输出:stdout 及 elasticsearch。事实上,可以定义很多个的输出。stdout 输出对于初期的调试是非常有帮助的。等完善了所有的调试,可以把上面的 stdout 输出关掉。依赖于是否已经为 Elasticsearch 提供安全设置,需要在上面配置好访问的用户名及密码。
等更新完这个配置文件后,再次在另外一个 console 中发送第一个 log:
head -n 1 weblog-sample.log | nc localhost 9900
GET logstash/_search
从上面可以看到这条 Logstash 导入的数据。它和之前在 Longstash console 里看到的是一摸一样的。
六、启用 keystore 来保护自己的密码等
在上面所有的配置中,我们在配置文件中把自己的用户名及密码都写在文本中间。这个是非常不好的,这是因为任何可以接触到这个配置文件的人都可以看到你这些敏感信息。为此,logstash-keystore 提供了一种安全机制。它允许我们把这些信息保存于一个 keystore 里,这样别人都看不到真实的用户名及密码等信息。
在 Logstash 的 console 里打入如下的命令:
bin/logstash-keystore create
显示:
$ bin/logstash-keystore create
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
2020-05-08T10:38:35.483+08:00 [main] WARN FilenoUtil : Native subprocess control requires open access to sun.nio.ch
Pass '--add-opens java.base/sun.nio.ch=org.jruby.dist' or '=org.jruby.core' to enable.
WARNING: The keystore password is not set. Please set the environment variable `LOGSTASH_KEYSTORE_PASS`. Failure to do so will result in reduced security. Continue without password protection on the keystore? [y/N] y
Created Logstash keystore at /Users/liuxg/elastic3/logstash-7.6.2/config/logstash.keystore
接着打入如下的命令:
bin/logstash-keystore add ES_HOST
把配置文件中的 Elasticsearch 的地址 localhost:9200 拷贝并粘贴过来:
$ bin/logstash-keystore add ES_HOST
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
2020-05-08T10:39:41.884+08:00 [main] WARN FilenoUtil : Native subprocess control requires open access to sun.nio.ch
Pass '--add-opens java.base/sun.nio.ch=org.jruby.dist' or '=org.jruby.core' to enable.
Enter value for ES_HOST:
Added 'es_host' to the Logstash keystore.
再接着执行如下的命令:
bin/logstash-keystore add LS_USER
显示:
$ bin/logstash-keystore add LS_USER
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
2020-05-08T10:41:41.603+08:00 [main] WARN FilenoUtil : Native subprocess control requires open access to sun.nio.ch
Pass '--add-opens java.base/sun.nio.ch=org.jruby.dist' or '=org.jruby.core' to enable.
Enter value for LS_USER:
Added 'ls_user' to the Logstash keystore.
在上面输入我们的 Logstash 的用户名。这个用户名可以是那个超级用户 elastic,也可以是我们自己创建的一个专为数据采集的用户。
最后,我们也可以打入如下的命令:
bin/logstash-keystore add LS_PWD
把上面用户名的密码进行输入:
$ bin/logstash-keystore add LS_PWD
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
2020-05-08T10:48:29.166+08:00 [main] WARN FilenoUtil : Native subprocess control requires open access to sun.nio.ch
Pass '--add-opens java.base/sun.nio.ch=org.jruby.dist' or '=org.jruby.core' to enable.
Enter value for LS_PWD:
Added 'ls_pwd' to the Logstash keystore.
在这里必须指出的是:上面我使用的 ES_HOST, LS_USER 及 LS_PWD
都是你自己任意可以选取的名字。只要它们和下边所使用的配置里的名字是配合的即可。
你可以使用如下的命令来查看你已经创建的 key:
./bin/logstash-keystore list
在上面,我们已经创建了一下键值,那么该如何使用它们呢?
input {
tcp {
port => 9900
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
mutate {
convert => {
"bytes" => "integer"
}
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "useragent"
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output {
stdout { }
elasticsearch {
hosts => ["${ES_HOST}"]
user => "${LS_USER}"
password => "${LS_PWD}"
}
}
在上面的 elasticsearch 输出部分,我们分别使用了 ES_HOST, LS_USER 及 LS_PWD 来分别代替了之前使用的字符串。这样做的好处是,我们再也不用硬编码我们的这些字符串了。
经过这个修改后,我们重新运行 Logstash:
bin/logstash -f weblog.conf
七、把整个文件都导入进 Elasticsearch
到目前为止,我们只是测试了我们的 Logstash 的配置文件部分。可能很多的人觉得并不完整。如果你想把整个的 log 文件都导入进 Elasticsearch 中,那么我们可以配合 Filebeat 来进行使用。我们使用的架构如下。我们先使用 Filebeat 把日志收集起来,然后把数据再传给 Logstash。通过 Logstash 强大的数据清洗功能,最终把数据写入到 Elasticsearch 中,并由 Kibana 进行可视化。
可以把 Filebeat 的配置文件设置为:
filebeat_logstash.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /path-to-log-file/weblog-sample.log
output.logstash:
hosts: ["localhost:9900"]
要记得根据自己 weblog-sample.log 的位置修改上面的 paths。同时,由于一些原因,我们也同时也要做上面的 Logstash 的配置文件 weblog.conf 做很小的修正。把 useragent 里的 source 修改为 user-agent 而不是之前的 agent。
weblog.conf
input {
beats {
port => "9900"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
mutate {
convert => {
"bytes" => "integer"
}
}
geoip {
source => "clientip"
}
useragent {
source => "user_agent"
target => "useragent"
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output {
stdout {
codec => dots {}
}
elasticsearch {
hosts=>["localhost:9200"]
user=>"elastic"
password=>"123456"
index => "apache_elastic_example"
template => "/Users/liuxg/data/beats/apache_template.json"
template_name => "apache_elastic_example"
template_overwrite => true
}
}
可以在地址:beats-getstarted 或 example下载所有的代码。记得换掉上面的路径即可。在使用时,先要启动 Logstash,然后再启动 Filebeat。
bin/logstash -f weblog.conf
bin/filebeat -e -c filebeat_logstash.yml
在 Kibana 中,我们可以通过如下的命令来查看索引:
GET _cat/indices
从上面,可以看出来有30万个数据被导入到 Elasticsearch 中。
评论