一架梯子,一头程序猿,仰望星空!

Logstash - 同步MYSQL数据到Elasticsearch


在实际项目场景中,业务数据主流的存储方案还是MYSQL,但是MYSQL处理海量数据的搜索能力较差,目前MYSQL搭配ES,为业务提供强大的数据搜索能力是业界主流的方案,因此需要解决如何将MYSQL中的数据导入到ES中,下面介绍通过Logstash准实时的将MYSQL数据导入到ES中。

1.jdbc插件介绍

Logstash通过jdbc input插件实现定时同步MYSQL数据,了解JAVA的同学应该对jdbc不陌生,就是访问数据库的API标准,我们常见的数据库都可以使用jdbc接口进行访问。

使用jdbc访问数据库,通常都需要安装对应数据库的jdbc驱动,例如:MYSQL的jdbc驱动,到MYSQL官网下载对应的jar包就可以。

MYSQL jdbc驱动下载地址:

https://mvnrepository.com/artifact/mysql/mysql-connector-java

找到MYSQL对应的版本下载JAR包即可,例如下面下载8.0.15版本。

2.简单的同步例子

关键配置有两点:

  • 配置input jdbc输入插件
  • 配置output elasticsearch输出插件

完整的配置如下

input {
  # 配置JDBC数据源
  jdbc {
    # mysql jdbc驱动路径
    jdbc_driver_library => "/Users/tizi365/.m2/repository/mysql/mysql-connector-java/8.0.18/mysql-connector-java-8.0.18.jar"
    # mysql jdbc驱动类
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    # MYSQL连接地址,格式: jdbc:mysql://服务器地址:端口/数据库名
    jdbc_connection_string => "jdbc:mysql://localhost:3306/wordpress"
    # MYSQL 账号
    jdbc_user => "root"
    # MYSQL 密码
    jdbc_password => "123456"
    # 定时任务配置,下面表示每分钟执行一次SQL
    # 具体语法请参考下一个章节内容
    schedule => "* * * * *"
    # 定时执行的SQL语句,Logstash会根据schedule配置,定时执行这里的SQL语句
    # 将SQL语句查询的结果,传给output插件
    statement => "SELECT * FROM `wp_posts`"
  }
}

output {
    stdout {
      # 配置将数据导入到ES中
      elasticsearch {
        # 索引名,logstash会将数据导入到这个索引中
        index => "wp_posts"
        # ES服务器地址,支持多个地址
        hosts => ["127.0.0.1:9200","127.0.0.2:9200"]
        # 设置ES文档的唯一Id值为SQL语句返回的id
        # 建议将document_id设置为MYSQL表的主键
        document_id => "%{id}"
      }
    }
}

3.定时任务配置

jdbc schedule的配置规则,类似linux的crontab的写法,具体语法规则如下:

语法格式,总共由5个字段组成,含义如下:

  *    *   *   *  * 

  分   时  天   月 星期

各个字段取值范围:

  • 分 - 0-59
  • 时 - 0-23
  • 天 - 1-31
  • 月 - 1-12
  • 星期 - 0-7

特殊字符含义:

  • 星号(*) :代表所有值,例如:第一个字段是星号(*),则代表每分钟。
  • 逗号(,):指定一个数值范围,例如:1,2,3,4
  • 横杠(-):另外一种表示一个整数范围的方法,例如:1-4 表示1,2,3,4
  • 斜线(/):可以用斜线指定时间的间隔频率,例如:*/5,如果用在分钟字段,表示每5分钟执行一次。

例子:

# 每分钟执行一次
* * * * *
# 每10分钟执行一次
*/10 * * * *
# 每小时执行一次
* */1 * * *
# 每天0点执行一次
0 0 * * *
# 每天凌晨2点1分执行一次
1 2 * * *

4.增量同步数据

前面的例子同步数据的SQL如下:

input {
  # 配置JDBC数据源
  jdbc {
    # 忽略其他配置
    statement => "SELECT * FROM `wp_posts`"
  }
}

同步数据的SQL语句,直接扫描全表的数据,如果数据量比较小,问题不大,如果数据量比较大,会直接卡死,logstash OOM挂了,因此需要实现增量同步,每次仅同步新增的数据。

Logstash提供了sql_last_value字段值,帮助我们实现增量同步;增量同步的核心思路就是,logstash每次执行SQL的时候,会将SQL查询结果的最后一条记录的某个值保存到sql_last_value字段中,下一次执行SQL的时候,以sql_last_value值作为参考,从这个值往后查询新数据。

例子:

input {
  jdbc {
    # 注意where条件id > :sql_last_value
    # 每次执行SQL的时候,id大于sql_last_value的值
    statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
    # 允许sql_last_value的值来自查询结果的某个字段值。
    use_column_value => true
    # sql_last_value的值来自查询结果中的最后一个id值
    tracking_column => "id"
    # ... 忽略其他配置
  }
}

说明:

sql_last_value的默认值是0或者1970-01-01,具体是什么值跟数据类型有关,上面的例子,定时任务执行SQL如下

# 第一次执行,sql_last_value=0
SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 0

# 第二次执行,sql_last_value=100,假设上面的SQL最后的id值是100
SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 100

# 第三次执行,sql_last_value=200,,假设上面的SQL最后的id值是200
SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 200

提示:

上面的例子,使用id作为增量同步数据的依据,不一定适合所有的业务场景,例如:同步文章数据,文章更新了,但是文章的id没有更新,这个时候使用id作为增量同步的依据,会导致更新的文章没有同步到ES,这种场景适合使用更新时间作为增量同步的依据,用法一样,sql_last_value换一个字段值即可。

5.分页

前面的章节在实现增量同步的时候,也存在一个问题,如果增量同步的数据太多的时候,logstash也会卡死,尤其是首次增量同步,例如:一个MYSQL表的数据有100万,首次增量同步数据,会扫描全表的数据。

logstash jdbc插件执行分页查询,避免一次查询太多数据,配置如下:

input {
  jdbc {
    # 激活分页处理
    jdbc_paging_enabled => true
    # 分页大小,每次查询1000条数据
    jdbc_page_size => 1000
    # sql语句
    statement => "SELECT * FROM my_table"
    # ... 忽略其他配置
  }
}

6.大表同步

在实际业务场景中,有些数据表的数据会有几百万,甚至上亿的数据,那么在使用logstash同步这些大表数据的时候,结合前面两个章节的增量同步和分页处理就可以解决,不过需要注意深度分页的性能问题。

例如:

# 每次查询1000条数据,但是翻页从第500万条数据偏移开始
SELECT * FROM my_table limit 5000000, 1000

这条SQL会非常慢,可以借助索引覆盖优化性能。

例子:

SELECT * FROM my_table WHERE id in (SELECT id FROM my_table limit 5000000, 1000)

因为id是主键,在主键索引中已经包含id的值,不需要回表扫描磁盘的数据,所以性能比较好,上面的SQL首先借助索引覆盖将id值查询出来,然后根据id查询具体的数据。