Logstash 手册

Logstash是一个开源的数据转化工具,可以将不同的数据源的数据进行收集、分析、处理,并存储到指定的介质,常用于搜集服务器上的日志到elasticsearch,或者快速地将数据从一种介质转移到另一种介质。

官方提供了多种安装方式,也可以直接下载源码,源码解压即可用。

常用命令

1
2
./bin/logstash -f getdata.conf	# 执行指定的配置文件
./bin/logstash -e input { } filter { } output { } # 直接执行指定的输入、过滤、输出管道,格式就是配置文件的格式,但要注意大括号前后都要加上空格,以免命令解析失败

常用插件

logstash-input-jdbc

  • 用于连接数据库
  • 使用./bin/logstash-plugin install logstash-input-jdbc进行安装
  • 一定要单独下载mysql-connector-javajar包,并一定得放在logstash的安装目录中的logstash-core/lib/lib/jars下(注意logstash的安装目录不一定在/opt下面),否则会出现找不到com.mysql.jdbc.Driver/jdbc_driver_library的错误,可以在这里下载指定的jar包版本
  • 下面的参考配置,当实际执行的时候,其实是定时去执行这样的查询语句:SELECT * FROM (SELECT id, field1, field2 FROM database_name.table_name WHERE id > 1000 ORDER BY id ASC) as 't' LIMIT 0, 1000,并会一直去执行,根据官方论坛的反馈,它没有终止条件,所以如果想要停止查询,只能主动kill掉该进程
  • 参考配置文件如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
input {
jdbc {
type => "type_name1"
jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&&useSSL=false&&allowMultiQueries=true"
jdbc_user => "database_name"
jdbc_password => "database_password"
jdbc_paging_enabled => "true" # 支持分页
jdbc_driver_library => "/opt/logstash/logstash-core/lib/jars/mysql-connector-java-6.0.6.jar" # 指定mysql-connector-java的jar包位置
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_default_timezone =>"Asia/Shanghai"
jdbc_page_size => "1000" # 每页数量
statement => "SELECT id, field1, field2 FROM database_name.table_name WHERE id > :sql_last_value ORDER BY id ASC;" # 定义查询语句(这里不用写上分页关键字,会自动分页)
schedule => "* * * * *" # 定时多久去查询一次
use_column_value => true
tracking_column => "id" # 跟踪列,会根据指定的列来进行跟踪,防止重复拉取
last_run_metadata_path => "/tmp/last_run_metadata" # 自定义跟踪文件地址,会将当前的跟踪列的最大值填入,下一次只会找比它大的数据
}
jdbc {
... # 参考上面的配置,可以同时连接多个数据库
}
}
filter {
if [type]=="type_name1"{
mutate {
replace => { my_field2 => "field1" } # 可以改变某个列的名称
replace => { business => "自定义的字段值" } # 可以自己添加列
}
if [type]=="type_name2"{
...
}
}
output {
if [type]=="type_name1"{
elasticsearch { # 直接输出到es
hosts => "localhost:9202"
index => "my_index" # 自定义的索引名
document_id => "%{id}" # 指定需要以哪一列来作为es的doc的id
}
if [type]=="type_name2"{
...
}
}

TroubleShooting

  • ERROR: Installation aborted, verification failed for logstash-input-jdbc,加上忽略验证的参数试试./logstash-plugin install --no-verify logstash-input-jdbc