MySQL 与 Elasticsearch 数据不对称问题解决办法

发布时间 - 2026-01-11 02:56:04    点击率:

MySQL 与 Elasticsearch 数据不对称问题解决办法

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field    | Type     | Null | Key | Default            | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id     | int(11)   | NO  |   | 0               |    |
| title    | mediumtext  | NO  |   | NULL              |    |
| description | mediumtext  | YES |   | NULL              |    |
| author   | varchar(100) | YES |   | NULL              |    |
| source   | varchar(100) | YES |   | NULL              |    |
| content   | longtext   | YES |   | NULL              |    |
| status   | enum('Y','N')| NO  |   | 'N'              |    |
| ctime    | timestamp  | NO  |   | CURRENT_TIMESTAMP       |    |
| mtime    | timestamp  | YES |   | ON UPDATE CURRENT_TIMESTAMP  |    |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

jdbc {
  jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
  jdbc_user => "cms"
  jdbc_password => "password"
  schedule => "* * * * *" #定时cron的表达式,这里是每分钟执行一次
  statement => "select * from article where mtime > :sql_last_value"
  use_column_value => true
  tracking_column => "mtime"
  tracking_column_type => "timestamp" 
  record_last_run => true
  last_run_metadata_path => "/var/tmp/article-mtime.last"
 }

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` (
 `id` int(11) NOT NULL,
 `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN
 -- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。
 IF NEW.status = 'N' THEN
 insert into elasticsearch_trash(id) values(OLD.id);
 END IF;
 -- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。
  IF NEW.status = 'Y' THEN
 delete from elasticsearch_trash where id = OLD.id;
 END IF;
END

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN
 -- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。
 insert into elasticsearch_trash(id) values(OLD.id);
END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体

package cn.netkiller.api.domain.elasticsearch;

import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table
public class ElasticsearchTrash {
 @Id
 private int id;

 @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
 private Date ctime;

 public int getId() {
 return id;
 }

 public void setId(int id) {
 this.id = id;
 }

 public Date getCtime() {
 return ctime;
 }

 public void setCtime(Date ctime) {
 this.ctime = ctime;
 }

}

仓库

package cn.netkiller.api.repository.elasticsearch;

import org.springframework.data.repository.CrudRepository;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;

public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{


}

定时任务

package cn.netkiller.api.schedule;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;

@Component
public class ScheduledTasks {
 private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);

 @Autowired
 private TransportClient client;

 @Autowired
 private ElasticsearchTrashRepository alasticsearchTrashRepository;

 public ScheduledTasks() {
 }

 @Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务
 public void cleanTrash() {
 for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
  DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();
  RestStatus status = response.status();
  logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());
  if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {
  alasticsearchTrashRepository.delete(elasticsearchTrash);
  }
 }
 }
}

Spring boot 启动主程序。

package cn.netkiller.api;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class Application {

 public static void main(String[] args) {
 SpringApplication.run(Application.class, args);
 }
}
 

以上就是MySQL 与 Elasticsearch 数据不对称问题解决办法的讲解,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


# MySQL  #   # Elasticsearch  # Elasticsearch数据不对称处理  # MySQL数据同步Elasticsearch的4种方案  # logstash将mysql数据同步到elasticsearch方法详解  # 使用logstash同步mysql数据到elasticsearch实现  # 使用canal监控mysql数据库实现elasticsearch索引实时更新问题  # Mysql到Elasticsearch高效实时同步Debezium实现  # 详解Mysql如何实现数据同步到Elasticsearch  # 用python简单实现mysql数据同步到ElasticSearch的教程  # 如何在Elasticsearch中启用和使用SQL功能  # 不对称  # 解决办法  # 每分钟  # 有一  # 如有  # 主程序  # 你还  # 希望能  # 这种情况  # 谢谢大家  # 提供一个  # 会做  # 中得  # 如果你没有  # 出现了  # 疑问请  # 误删除  # jdbc_user  # jdbc_password  # usr 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: 电商网站制作多少钱一个,电子商务公司的网站制作费用计入什么科目?  Swift中switch语句区间和元组模式匹配  如何彻底卸载建站之星软件?  Laravel怎么创建控制器Controller_Laravel路由绑定与控制器逻辑编写【指南】  消息称 OpenAI 正研发的神秘硬件设备或为智能笔,富士康代工  Laravel如何实现多表关联模型定义_Laravel多对多关系及中间表数据存取【方法】  Laravel如何创建自定义Artisan命令?(代码示例)  免费视频制作网站,更新又快又好的免费电影网站?  Win11关机界面怎么改_Win11自定义关机画面设置【工具】  Laravel如何使用withoutEvents方法临时禁用模型事件  如何快速查询网址的建站时间与历史轨迹?  浅述节点的创建及常见功能的实现  如何挑选优质建站一级代理提升网站排名?  Laravel如何创建和注册中间件_Laravel中间件编写与应用流程  如何在橙子建站上传落地页?操作指南详解  Laravel定时任务怎么设置_Laravel Crontab调度器配置  网站制作大概多少钱一个,做一个平台网站大概多少钱?  Laravel如何配置任务调度?(Cron Job示例)  Laravel如何使用模型观察者?(Observer代码示例)  Gemini怎么用新功能实时问答_Gemini实时问答使用【步骤】  Claude怎样写结构化提示词_Claude结构化提示词写法【教程】  Laravel如何处理JSON字段_Eloquent原生JSON字段类型操作教程  简单实现Android文件上传  韩国网站服务器搭建指南:VPS选购、域名解析与DNS配置推荐  Laravel怎么导出Excel文件_Laravel Excel插件使用教程  Laravel怎么写单元测试_PHPUnit在Laravel项目中的基础测试入门  rsync同步时出现rsync: failed to set times on “xxxx”: Operation not permitted  黑客如何利用漏洞与弱口令入侵网站服务器?  Laravel如何使用Passport实现OAuth2?(完整配置步骤)  ChatGPT怎么生成Excel公式_ChatGPT公式生成方法【指南】  Laravel怎么使用Blade模板引擎_Laravel模板继承与Component组件复用【手册】  如何在VPS电脑上快速搭建网站?  nodejs redis 发布订阅机制封装实现方法及实例代码  JavaScript数据类型有哪些_如何准确判断一个变量的类型  Claude怎样写约束型提示词_Claude约束提示词写法【教程】  油猴 教程,油猴搜脚本为什么会网页无法显示?  如何获取PHP WAP自助建站系统源码?  七夕网站制作视频,七夕大促活动怎么报名?  Laravel中Service Container是做什么的_Laravel服务容器与依赖注入核心概念解析  html5源代码发行怎么设置权限_访问权限控制方法与实践【指南】  中山网站制作网页,中山新生登记系统登记流程?  Laravel如何实现一对一模型关联?(Eloquent示例)  香港服务器租用费用高吗?如何避免常见误区?  浅谈redis在项目中的应用  Laravel如何实现邮件验证激活账户_Laravel内置MustVerifyEmail接口配置【步骤】  laravel服务容器和依赖注入怎么理解_laravel服务容器与依赖注入解析  Laravel表单请求验证类怎么用_Laravel Form Request分离验证逻辑教程  如何用搬瓦工VPS快速搭建个人网站?  Android实现代码画虚线边框背景效果  韩国服务器如何优化跨境访问实现高效连接?