Lösung für das Datenasymmetrieproblem zwischen MySQL und Elasticsearch

Lösung für das Datenasymmetrieproblem zwischen MySQL und Elasticsearch

Lösung für das Datenasymmetrieproblem zwischen MySQL und Elasticsearch

Das JDBC-Eingabe-Plugin kann nur Datenbankanhänge und inkrementelle Schreibvorgänge in Elasticsearch implementieren, aber die Datenbank auf der JDBC-Quellseite kann häufig Datenbanklösch- oder Aktualisierungsvorgänge durchführen. Dadurch entsteht eine Asymmetrie zwischen der Datenbank und der Datenbasis der Suchmaschine.

Wenn Sie über ein Entwicklungsteam verfügen, können Sie natürlich ein Programm schreiben, um Suchmaschinenvorgänge beim Löschen oder Aktualisieren zu synchronisieren. Wenn Sie über diese Möglichkeit nicht verfügen, können Sie die folgende Methode ausprobieren.

Hier ist ein Datentabellenartikel, das mtime-Feld ist als ON UPDATE CURRENT_TIMESTAMP definiert, sodass sich die Zeit jeder Aktualisierung von mtime ändert

mysql> Beschreibungsartikel;
+-------------+--------------+------+-----+--------------------------------+-----------+
| Feld | Typ | Null | Schlüssel | Standard | Extra |
+-------------+--------------+------+-----+--------------------------------+-----------+
| Ich würde | int(11) | NEIN | | 0 | |
| Titel | Mitteltext | NEIN | | NULL | |
| Beschreibung | Mitteltext | JA | | NULL | |
| Autor | varchar(100) | JA | | NULL | |
| Quelle | varchar(100) | JA | | NULL | |
| Inhalt | Langtext | JA | | NULL | |
| Status | Aufzählung('J','N')| NEIN | | 'N' | |
| ctime | Zeitstempel | NEIN | | AKTUELLER_ZEITSTEMPEL | |
| mtime | Zeitstempel | JA | | BEI UPDATE CURRENT_TIMESTAMP | |
+-------------+--------------+------+-----+--------------------------------+-----------+
7 Zeilen im Satz (0,00 Sek.)

Logstash fügt Abfrageregeln für mtime hinzu

jdbc {
  jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
  jdbc_driver_class => "com.mysql.jdbc.Treiber"
  jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
  jdbc_user => "cms"
  jdbc_password => "Passwort"
  Zeitplan => "* * * * *" #Zeitgesteuerter Cron-Ausdruck, hier wird er einmal pro Minute ausgeführt. Anweisung => "select * from article where mtime > :sql_last_value"
  use_column_value => wahr
  Tracking-Spalte => "mtime"
  tracking_column_type => "Zeitstempel" 
  record_last_run => wahr
  last_run_metadata_path => "/var/tmp/article-mtime.last"
 }

Erstellen Sie eine Papierkorbtabelle, die zum Lösen des Problems der Datenbanklöschung oder des Deaktivierungsstatus = „N“ verwendet wird.

Tabelle „elasticsearch_trash“ erstellen (
 `id` int(11) NICHT NULL,
 `ctime` Zeitstempel NULL DEFAULT CURRENT_TIMESTAMP,
 PRIMÄRSCHLÜSSEL (`id`)
) ENGINE=InnoDB STANDARD-CHARSET=utf8

Trigger für die Artikeltabelle erstellen

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` VOR DEM UPDATE AUF `article` FÜR JEDE ZEILE
BEGINNEN
 - Die Logik hier besteht darin, das Problem zu lösen, dass, wenn der Artikelstatus N ändert, die entsprechenden Daten in der Suchmaschine gelöscht werden müssen.
 WENN NEU.status = 'N' DANN
 in elasticsearch_trash(id) Werte (OLD.id) einfügen;
 ENDE, WENN;
 – Die Logik hierbei besteht darin, dass bei einer Änderung des Status auf Y die Artikel-ID weiterhin in der Methode elasticsearch_trash vorhanden ist, was zu einer versehentlichen Löschung führen kann. Daher müssen Sie die Recyclingdatensätze im Papierkorb löschen.
  WENN NEU.status = 'J' DANN
 aus elasticsearch_trash löschen, wobei ID = OLD.id;
 ENDE, WENN;
ENDE

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` VOR DEM LÖSCHEN VON `article` FÜR JEDE ZEILE
BEGINNEN
 – Die Logik hierbei besteht darin, dass ein Artikel beim Löschen in den Papierkorb der Suchmaschine verschoben wird.
 in elasticsearch_trash(id) Werte (OLD.id) einfügen;
ENDE

Als Nächstes müssen wir eine einfache Shell schreiben, die einmal pro Minute ausgeführt wird, um Daten aus der Datentabelle „elasticsearch_trash“ abzurufen, und dann mit dem Befehl „curl“ die Restful-Schnittstelle von Elasticsearch aufrufen, um die abgerufenen Daten zu löschen.

Sie können auch verwandte Programme entwickeln. Hier ist ein Beispiel für eine geplante Spring-Boot-Aufgabe.

juristische Person

Paket cn.netkiller.api.domain.elasticsearch;

importiere java.util.Date;

importiere javax.persistence.Column;
importiere javax.persistence.Entity;
importiere javax.persistence.Id;
importiere javax.persistence.Table;

@Juristische Person
@Tisch
öffentliche Klasse ElasticsearchTrash {
 @Ausweis
 private int-ID;

 @Column(columnDefinition = "ZEITSTEMPEL STANDARD AKTUELLER_ZEITSTEMPEL")
 privates Datum ctime;

 öffentliche int getId() {
 Rückgabe-ID;
 }

 öffentliche void setId(int id) {
 diese.id = ID;
 }

 öffentliches Datum getCtime() {
 ctime zurückgeben;
 }

 öffentliche void setCtime(Datum ctime) {
 dies.ctime = ctime;
 }

}

Lagerhaus

Paket cn.netkiller.api.repository.elasticsearch;

importiere org.springframework.data.repository.CrudRepository;

importiere com.example.api.domain.elasticsearch.ElasticsearchTrash;

öffentliche Schnittstelle ElasticsearchTrashRepository erweitert CrudRepository<ElasticsearchTrash, Integer>{


}

Geplante Aufgaben

Paket cn.netkiller.api.schedule;

importiere org.elasticsearch.action.delete.DeleteResponse;
importiere org.elasticsearch.client.transport.TransportClient;
importiere org.elasticsearch.rest.RestStatus;
importiere org.slf4j.Logger;
importiere org.slf4j.LoggerFactory;
importiere org.springframework.beans.factory.annotation.Autowired;
importiere org.springframework.scheduling.annotation.Scheduled;
importiere org.springframework.stereotype.Component;

importiere com.example.api.domain.elasticsearch.ElasticsearchTrash;
importiere com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;

@Komponente
öffentliche Klasse ScheduledTasks {
 privater statischer finaler Logger-Logger = LoggerFactory.getLogger (ScheduledTasks.class);

 @Autowired
 privater TransportClient-Client;

 @Autowired
 privates ElasticsearchTrashRepository alasticsearchTrashRepository;

 öffentliche ScheduledTasks() {
 }

 @Scheduled(fixedRate = 1000 * 60) // Führe die geplante Aufgabe alle 60 Sekunden aus public void cleanTrash() {
 für (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {
  DeleteResponse-Antwort = Client.prepareDelete("Information", "Artikel", elasticsearchTrash.getId() + "").get();
  RestStatus status = antwort.status();
  logger.info("löschen {} {}", elasticsearchTrash.getId(), status.toString());
  wenn (status == RestStatus.OK || status == RestStatus.NICHT_GEFUNDEN) {
  alasticsearchTrashRepository.delete(elasticsearchTrash);
  }
 }
 }
}

Spring Boot startet das Hauptprogramm.

Paket cn.netkiller.api;

importiere org.springframework.boot.SpringApplication;
importiere org.springframework.boot.autoconfigure.SpringBootApplication;
importiere org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootAnwendung
@EnableScheduling
öffentliche Klasse Anwendung {

 öffentliche statische void main(String[] args) {
 SpringApplication.run(Anwendung.Klasse, Argumente);
 }
}
 

Oben finden Sie eine Erklärung zur Lösung des Datenasymmetrieproblems zwischen MySQL und Elasticsearch. Wenn Sie Fragen haben, hinterlassen Sie bitte eine Nachricht oder diskutieren Sie in der Community dieser Site. Vielen Dank fürs Lesen und ich hoffe, es kann Ihnen helfen. Vielen Dank für Ihre Unterstützung dieser Site!

Das könnte Sie auch interessieren:
  • Installation und Konfiguration von MySQL 5.6 unter Windows mit Screenshots und ausführlicher Anleitung
  • Verwendungshinweise für die Mysql-String-Interception-Funktion SUBSTRING
  • Zusammenfassung der Verwendung des MySQL-Datumsdatentyps und des Zeittyps
  • MySQL-Benutzererstellung und Autorisierungsmethode
  • Anweisungen zur Verwendung der MySQL CASE WHEN-Anweisung
  • Detaillierte Verwendung der MySQL-Update-Anweisung
  • MySQL-Tipps: Lösung für das Problem, dass der Server beendet wird, ohne die PID-Datei zu aktualisieren

<<:  Versuchen Sie Docker+Nginx, um die Single-Page-Anwendungsmethode bereitzustellen

>>:  Anwendungsbeispiele für die try_files-Direktive von Nginx

Artikel empfehlen

Die umfassendste Erklärung des Sperrmechanismus in MySQL

Inhaltsverzeichnis Vorwort Globale Sperre Vollstä...

Details zum Like-Operator in MySQL

1. Einleitung Beim Filtern unbekannter oder teilw...

Der Unterschied zwischen ${param} und #{param} in MySQL

Der von ${param} übergebene Parameter wird als Te...

Der Unterschied zwischen HTML-Iframe und Frameset_PowerNode Java Academy

Einführung 1. <iframe>-Tag: Ein Iframe ist ...

React Native realisiert den Auf- und Ab-Pull-Effekt der Überwachungsgeste

React Native implementiert die Überwachungsgeste ...

Tutorial zur MySQL-Installation unter Linux (Binärdistribution)

Dieses Tutorial beschreibt Ihnen die detaillierte...

Lösung für MySQL 8.0 kann nicht gestartet werden 3534

Der MySQL 8.0-Dienst kann nicht gestartet werden ...

Detaillierte Erläuterung der Methode zum Vergleichen von Daten in MySQL

Wenn es eine Tabelle mit einem Feld „add_time“ gi...

Designideen für MySQL-Backup und -Wiederherstellung

Hintergrund Lassen Sie mich zunächst den Hintergr...

jQuery-Plugin zum Implementieren eines schwebenden Menüs

Lernen Sie jeden Tag ein schwebendes jQuery-Plug-...

Tutorial zur Installation von MySQL 5.6 auf CentOS 6.5

1. Laden Sie das RPM-Paket für Linux herunter htt...