字节笔记本字节笔记本

如何实现 使用docker-compose logstash 同步mysql 数据 es 实现关键字搜索

2023-03-08

使用docker-compose通过Logstash同步MySQL数据到Elasticsearch的步骤包括编写Logstash配置文件、配置docker-compose启动服务、添加Logstash过滤规则、在Elasticsearch中创建索引及配置mapping,以及在Kibana中进行搜索和可视化。

要实现使用docker-compose logstash同步MySQL数据到Elasticsearch,需要按照以下步骤进行操作:

  1. 编写logstash的配置文件logstash.conf,配置MySQL的input和Elasticsearch的output,以及数据的转换和过滤规则。示例配置文件如下:

input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
jdbc_user => "user"
jdbc_password => "password"
jdbc_driver_library => "/path/to/mysql-connector-java-8.0.23.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
schedule => "* * * * *"
statement => "SELECT * from table_name"
}
}

filter {
mutate {
remove_field => ["@version", "@timestamp"]
}
}

output {
elasticsearch {
hosts => "elasticsearch:9200"
index => "index_name"
}
}

  1. 在docker-compose.yml文件中配置logstash和elasticsearch服务的启动参数,启动服务:

version: '3.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.2
container_name: elasticsearch
environment:
- discovery.type=single-node
ports:
- "9200:9200"
logstash:
image: docker.elastic.co/logstash/logstash:7.11.2
container_name: logstash
command: logstash -f /usr/share/logstash/pipeline/logstash.conf
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
- ./mysql-connector-java-8.0.23.jar:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.23.jar
links:
- elasticsearch
depends_on:
- elasticsearch

  1. 在logstash的配置文件中加入filter规则,用于对字段值进行分词、过滤、转换等操作,以实现关键字搜索。示例filter规则如下:

filter {
mutate {
remove_field => ["@version", "@timestamp"]
}
if [message] =~ /keyword/ {
grok {
match => { "message" => "%{WORD:field_name}: %{GREEDYDATA:field_value}" }
}
mutate {
add_field => { "field_name" => "field_type" }
lowercase => ["field_value"]
}
if [field_name] == "date" {
date {
match => ["field_value", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "field_value"
}
}
if [field_name] == "integer" {
mutate {
convert => { "field_value" => "integer" }
}
}
if [field_name] == "float" {
mutate {
convert => { "field_value" => "float" }
}
}
if [field_name] == "boolean" {
mutate {
convert => { "field_value" => "boolean" }
}
}
if [field_name] == "text" {
if [field_value] =~ /keyword/ {
mutate {
add_field => { "is_keyword" => "true" }
}
}
if [is_keyword] == "true" {
mutate {
split => { "field_value" => " " }
lowercase => ["field_value"]
}
}
}
}
}

  1. 在Elasticsearch中创建索引,并配置mapping,以便在查询时自动进行分词和过滤。示例mapping如下:

PUT /index_name
{
"mappings": {
"properties": {
"field_name": { "type": "text" },
"field_value": { "type": "text", "analyzer": "standard" }
}
}
}

  1. 在Kibana中创建索引pattern,并进行搜索和可视化展示。示例搜索和可视化如下:

GET /index_name/_search
{
"query": {
"match": {
"field_value": "keyword"
}
}
}

参考文献: