Logstash
是一个开源的数据转化工具,可以将不同的数据源的数据进行收集、分析、处理,并存储到指定的介质,常用于搜集服务器上的日志到elasticsearch
,或者快速地将数据从一种介质转移到另一种介质。
官方 提供了多种安装方式,也可以直接下载源码,源码解压即可用。
常用命令 1 2 ./bin/logstash -f getdata.conf # 执行指定的配置文件 ./bin/logstash -e input { } filter { } output { } # 直接执行指定的输入、过滤、输出管道,格式就是配置文件的格式,但要注意大括号前后都要加上空格,以免命令解析失败
常用插件
logstash-filter-urldecode/urlencode
用于对指定字段进行urldecode/urlencode
操作
可以在filter
配置区块里这样指定:
1 2 3 4 5 filter { urldecode { all_fields => true } }
用于连接数据库
使用./bin/logstash-plugin install logstash-input-jdbc
进行安装
一定要单独下载mysql-connector-java
的jar
包,并一定得放在logstash
的安装目录中的logstash-core/lib/lib/jars
下(注意logstash
的安装目录不一定在/opt
下面),否则会出现找不到com.mysql.jdbc.Driver/jdbc_driver_library
的错误,可以在这里下载 指定的jar包版本
下面的参考配置,当实际执行的时候,其实是定时去执行这样的查询语句:SELECT * FROM (SELECT id, field1, field2 FROM database_name.table_name WHERE id > 1000 ORDER BY id ASC) as 't1' LIMIT 1000 OFFSET 0
,并会一直去执行,根据官方论坛的反馈,它没有终止条件,所以如果想要停止查询,只能主动kill
掉该进程
参考配置文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 input { jdbc { type => "type_name1" jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&&useSSL=false&&allowMultiQueries=true" jdbc_user => "database_name" jdbc_password => "database_password" jdbc_paging_enabled => "true" # 支持分页 jdbc_driver_library => "/opt/logstash/logstash-core/lib/jars/mysql-connector-java-6.0.6.jar" # 指定mysql-connector-java的jar包位置 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_default_timezone =>"Asia/Shanghai" jdbc_page_size => "1000" # 每页数量 statement => "SELECT id, field1, field2 FROM database_name.table_name WHERE id > :sql_last_value ORDER BY id ASC;" # 定义查询语句(这里不用写上分页关键字,会自动分页) schedule => "* * * * *" # 定时多久去查询一次 use_column_value => true tracking_column => "id" # 跟踪列,会根据指定的列来进行跟踪,防止重复拉取 last_run_metadata_path => "/tmp/last_run_metadata" # 自定义跟踪文件地址,会将当前的跟踪列的最大值填入,下一次只会找比它大的数据 } jdbc { ... # 参考上面的配置,可以同时连接多个数据库 } } filter { if [type]=="type_name1"{ mutate { replace => { my_field2 => "field1" } # 可以改变某个列的名称 replace => { business => "自定义的字段值" } # 可以自己添加列 } if [type]=="type_name2"{ ... } } output { if [type]=="type_name1"{ elasticsearch { # 直接输出到es hosts => "localhost:9202" index => "my_index" # 自定义的索引名 document_id => "%{id}" # 指定需要以哪一列来作为es的doc的id } if [type]=="type_name2"{ ... } }
TroubleShooting
**ERROR: Installation aborted, verification failed for logstash-input-jdbc
**,加上忽略验证的参数试试./logstash-plugin install --no-verify logstash-input-jdbc
logstash-input-jdbc分页不起作用 : 特定版本确实有bug,issue ,可以使用./bin/logstash-plugin install --version 4.3.14 logstash-input-jdbc
安装指定的版本
Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the “path.data” setting.无法同时运行多个logstash实例 : 需要在命令中加入--path.data anotherpath
指定logstash及其插件用于任何持久性需求的目录