Logstash是一个开源的数据转化工具,可以将不同的数据源的数据进行收集、分析、处理,并存储到指定的介质,常用于搜集服务器上的日志到elasticsearch,或者快速地将数据从一种介质转移到另一种介质。
官方提供了多种安装方式,也可以直接下载源码,源码解压即可用。
常用命令
./bin/logstash -f getdata.conf # 执行指定的配置文件
./bin/logstash -e input { } filter { } output { } # 直接执行指定的输入、过滤、输出管道,格式就是配置文件的格式,但要注意大括号前后都要加上空格,以免命令解析失败
常用插件
logstash-filter-urldecode/urlencode
- 用于对指定字段进行
urldecode/urlencode操作
可以在filter配置区块里这样指定:
filter {
urldecode {
all_fields => true
}
}
logstash-input-jdbc
- 用于连接数据库
- 使用
./bin/logstash-plugin install logstash-input-jdbc进行安装 - 一定要单独下载
mysql-connector-java的jar包,并一定得放在logstash的安装目录中的logstash-core/lib/lib/jars下(注意logstash的安装目录不一定在/opt下面),否则会出现找不到com.mysql.jdbc.Driver/jdbc_driver_library的错误,可以在这里下载指定的jar包版本 - 下面的参考配置,当实际执行的时候,其实是定时去执行这样的查询语句:
SELECT * FROM (SELECT id, field1, field2 FROM database_name.table_name WHERE id > 1000 ORDER BY id ASC) as 't1' LIMIT 1000 OFFSET 0,并会一直去执行,根据官方论坛的反馈,它没有终止条件,所以如果想要停止查询,只能主动kill掉该进程 - 参考配置文件如下:
input {
jdbc {
type => "type_name1"
jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&&useSSL=false&&allowMultiQueries=true"
jdbc_user => "database_name"
jdbc_password => "database_password"
jdbc_paging_enabled => "true" # 支持分页
jdbc_driver_library => "/opt/logstash/logstash-core/lib/jars/mysql-connector-java-6.0.6.jar" # 指定mysql-connector-java的jar包位置
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_default_timezone =>"Asia/Shanghai"
jdbc_page_size => "1000" # 每页数量
statement => "SELECT id, field1, field2 FROM database_name.table_name WHERE id > :sql_last_value ORDER BY id ASC;" # 定义查询语句(这里不用写上分页关键字,会自动分页)
schedule => "* * * * *" # 定时多久去查询一次
use_column_value => true
tracking_column => "id" # 跟踪列,会根据指定的列来进行跟踪,防止重复拉取
last_run_metadata_path => "/tmp/last_run_metadata" # 自定义跟踪文件地址,会将当前的跟踪列的最大值填入,下一次只会找比它大的数据
}
jdbc {
... # 参考上面的配置,可以同时连接多个数据库
}
}
filter {
if [type]=="type_name1"{
mutate {
replace => { my_field2 => "field1" } # 可以改变某个列的名称
replace => { business => "自定义的字段值" } # 可以自己添加列
}
if [type]=="type_name2"{
...
}
}
output {
if [type]=="type_name1"{
elasticsearch { # 直接输出到es
hosts => "localhost:9202"
index => "my_index" # 自定义的索引名
document_id => "%{id}" # 指定需要以哪一列来作为es的doc的id
}
if [type]=="type_name2"{
...
}
}
TroubleShooting
ERROR: Installation aborted, verification failed for logstash-input-jdbc,加上忽略验证的参数试试./logstash-plugin install --no-verify logstash-input-jdbc- logstash-input-jdbc分页不起作用: 特定版本确实有bug,issue,可以使用
./bin/logstash-plugin install --version 4.3.14 logstash-input-jdbc安装指定的版本 - Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the “path.data” setting.无法同时运行多个logstash实例: 需要在命令中加入
--path.data anotherpath指定logstash及其插件用于任何持久性需求的目录
评论 · Comments
评论由 Giscus 提供,需用 GitHub 账号登录;留言会同步到这个仓库的 Discussions 里。