1.Logstash概述
Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到设置的的“存储库”中。
2.为什么使用Logstash
某台服务器部署了多个实例,则需要去每个应用实例的日志目录下去找日志文件。每个应用实例还会设置日志滚动策略(如:每天生成一个文件),
还有日志压缩归档策略等。如果我们能把这些日志集中管理,并提供集中检索功能,不仅可以提高诊断的效率,同时对系统情况有个全面的理解,避免事后救火的被动。
所以日志集中管理功能就可以使用ELK技术栈进行实现。Elasticsearch只有数据存储和分析的能力,Kibana就是可视化管理平台。还缺少数据收集和整理的角色,这个功能就是Logstash负责的。
3.Logstash工作原理
(1) Data Source
Logstash 支持的数据源有很多, 如:Spring Boot中默认推荐logback支持日志输出功能(输出到数据库、数据出到文件)。
(2) Logstash Pipeline
Logstash中包含非常重要的三个功能:
(1) Input:输入源,一般配置监听的主机及端口。DataSource向指定的ip及端口输出日志,Input 输入源监听到数据信息就可以进行收集。
(2) Filter:过滤功能,对收集到的信息进行过滤(额外处理),也可以省略这个配置(不做处理)
(3) Output:收集到的信息发送给谁, ELK技术栈中都是输出给Elasticsearch,后面数据检索和数据分析的过程就给Elasticsearch了。
3.采用Logstash收集SpringBoot日志
(1) 安装logstash
解压logstash-6.8.4.tar.gz(与当前es的版本匹配)
创建一个conf配置文件
#输入采用tcp监控 ,当前监控端口为5044
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 5044
}
}
#输出到es中,配置index索引为my-es-log-年月日
output {
elasticsearch {
hosts => ["http://hadoop208:9200","http://hadoop209:9200"]
index => "my-es-log-%{+YYYY.MM.dd}"
}
}
### 启动logstash
./logstash -f /usr/local/logstash/config/mylogstash.conf
4.在Springboot添加logback
(1)添加maven
<!-- 使用logback往logstash中写入日志 -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.3</version>
</dependency>
(2) 配置logback.xml(在resource目录下)
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml" />
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!--配置logStash 服务地址-->
<destination>192.168.92.50:4560</destination>
<!-- 日志输出编码 -->
<encoder charset="UTF-8"
class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"logLevel": "%level",
"serviceName": "${springAppName:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="LOGSTASH" />
<appender-ref ref="CONSOLE" />
</root>
</configuration>
5.Logstash完成MySQL数据增量导入es
5.1 上传jdbc连接jar
(1) 下载地址
上传到:$LogStash_HOME/logstash_core/lib/jars/
(2) 创建mysql
CREATE TABLE `tb_item` (
`id` bigint(20) NOT NULL COMMENT '商品id,同时也是商品编号',
`title` varchar(100) NOT NULL COMMENT '商品标题',
`sell_point` varchar(500) DEFAULT NULL COMMENT '商品卖点',
`price` bigint(20) NOT NULL COMMENT '商品价格,单位为:分',
`num` int(10) NOT NULL COMMENT '库存数量',
`barcode` varchar(30) DEFAULT NULL COMMENT '商品条形码',
`image` varchar(500) DEFAULT NULL COMMENT '商品图片',
`cid` bigint(10) NOT NULL COMMENT '所属类目,叶子类目',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '商品状态,1-正常,2-下架,3-删除',
`created` datetime NOT NULL COMMENT '创建时间',
`updated` datetime NOT NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `cid` (`cid`),
KEY `status` (`status`),
KEY `updated` (`updated`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='商品表';
LogStash实现增量导入,需要有一个定位字段,这个字段的数据,可以表示数据的新旧,代表这个数据是否是一个需要导入到ES中的数据。
每次读取数据的时候,都会记录一个最大的updated时间,每次读取数据的时候,都读取updated大于等于记录的定位字段数据。每次查询的就都是最新的,要导入到ES中的数据。
(3)编写logstash配置
在$LogStash_home/config/目录中,编写配置文件ego-items-db2es.conf
input {
jdbc {
# 连接地址
jdbc_connection_string => "jdbc:mysql://hadop202:3306/ego?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"
# 数据库用户名和密码
jdbc_user => "root"
jdbc_password => "123456"
# 驱动类,如果使用低版本的logstash,需要再增加配置 jdbc_driver_library,配置驱动包所在位置
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 是否开启分页逻辑
jdbc_paging_enabled => true
# 分页的长度是多少
jdbc_page_size => "2000"
# 时区
jdbc_default_timezone => "Asia/Shanghai"
# 执行的SQL
statement => "select id, title, sell_point, price, image, updated from tb_item where updated >= :sql_last_value order by updated asc"
# 执行SQL的周期, [秒] 分钟 小时 天 月 年
schedule => "* * * * *"
# 是否使用字段的值作为比较策略
use_column_value => true
# 作为比较策略的字段名称
tracking_column => "updated"
# 作为比较策略的字段类型,可选为numberic和timestamp
tracking_column_type => "timestamp"
# 记录最近的比较策略字段值的文件是什么,相对寻址路径是logstash的安装路径
last_run_metadata_path => "./ego-items-db2es-last-value"
# 是否每次执行SQL的时候,都删除last_run_metadata_path文件内容
clean_run => false
# 是否强制把ES中的字段名都定义为小写。
lowercase_column_names => false
}
}
output {
elasticsearch {
hosts => ["http://hadoop208:9200", "http://hadoop209:9200"]
index => "ego-items-index"
action => "index"
#指定document_id为数据库主键,保证更新
document_id => "%{id}"
}
}
(4) 安装logstash-input-jdbc插件
$Logstash_HOME/bin/logstash-plugin install logstash-input-jdbc
(5) 启动测试
bin/logstash -f 配置文件