Kafka LogStash ES

Kafka LogStash ES

java 程序日志通过 logback,将日志写入kafka,在通过logstash采集kafka中数据,将数据写入ES中,最后通过 Kibana 查询程序日志。

中文编码

问题: 程序日志中文写入 kafka,最后在Kibana中以Unicode的方式显示。

解决方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
input {
kafka {
bootstrap_servers => "kakfa ip"
group_id => "Cosumer_10-127-28-11"
topics => "logstash"
<!-- codec => json_lines 修改中文显示方式 -->
codec => json_lines
}
}

filter {

}

output {
elasticsearch{
action => "index"
hosts => ["es ip"]
user => "elastic"
password => "changeme"
index => "applog"
}
}

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.11</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC1</version>
</dependency>

logback.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

<?xml version="1.0" encoding="UTF-8"?>
<!--该日志将日志级别不同的log信息保存到不同的文件中 -->
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>

<springProperty scope="context" name="springAppName"
source="spring.application.name"/>

<springProperty scope="context" name="springAppPort"
source="server.port"/>

<appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeContext>true</includeContext>
<includeCallerData>true</includeCallerData>
<customFields>{"system":"project-name"}</customFields>
<fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
</encoder>
<topic>logstash</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>bootstrap.servers=</producerConfig>
</appender>

<!-- 日志输出级别 -->
<root level="INFO">
<appender-ref ref="KafkaAppender"/>
</root>
</configuration>