Lösung für das Datenasymmetrieproblem zwischen MySQL und Elasticsearch Das JDBC-Eingabe-Plugin kann nur Datenbankanhänge und inkrementelle Schreibvorgänge in Elasticsearch implementieren, aber die Datenbank auf der JDBC-Quellseite kann häufig Datenbanklösch- oder Aktualisierungsvorgänge durchführen. Dadurch entsteht eine Asymmetrie zwischen der Datenbank und der Datenbasis der Suchmaschine. Wenn Sie über ein Entwicklungsteam verfügen, können Sie natürlich ein Programm schreiben, um Suchmaschinenvorgänge beim Löschen oder Aktualisieren zu synchronisieren. Wenn Sie über diese Möglichkeit nicht verfügen, können Sie die folgende Methode ausprobieren. Hier ist ein Datentabellenartikel, das mtime-Feld ist als ON UPDATE CURRENT_TIMESTAMP definiert, sodass sich die Zeit jeder Aktualisierung von mtime ändert mysql> Beschreibungsartikel; +-------------+--------------+------+-----+--------------------------------+-----------+ | Feld | Typ | Null | Schlüssel | Standard | Extra | +-------------+--------------+------+-----+--------------------------------+-----------+ | Ich würde | int(11) | NEIN | | 0 | | | Titel | Mitteltext | NEIN | | NULL | | | Beschreibung | Mitteltext | JA | | NULL | | | Autor | varchar(100) | JA | | NULL | | | Quelle | varchar(100) | JA | | NULL | | | Inhalt | Langtext | JA | | NULL | | | Status | Aufzählung('J','N')| NEIN | | 'N' | | | ctime | Zeitstempel | NEIN | | AKTUELLER_ZEITSTEMPEL | | | mtime | Zeitstempel | JA | | BEI UPDATE CURRENT_TIMESTAMP | | +-------------+--------------+------+-----+--------------------------------+-----------+ 7 Zeilen im Satz (0,00 Sek.) Logstash fügt Abfrageregeln für mtime hinzu jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Treiber" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "Passwort" Zeitplan => "* * * * *" #Zeitgesteuerter Cron-Ausdruck, hier wird er einmal pro Minute ausgeführt. Anweisung => "select * from article where mtime > :sql_last_value" use_column_value => wahr Tracking-Spalte => "mtime" tracking_column_type => "Zeitstempel" record_last_run => wahr last_run_metadata_path => "/var/tmp/article-mtime.last" } Erstellen Sie eine Papierkorbtabelle, die zum Lösen des Problems der Datenbanklöschung oder des Deaktivierungsstatus = „N“ verwendet wird. Tabelle „elasticsearch_trash“ erstellen ( `id` int(11) NICHT NULL, `ctime` Zeitstempel NULL DEFAULT CURRENT_TIMESTAMP, PRIMÄRSCHLÜSSEL (`id`) ) ENGINE=InnoDB STANDARD-CHARSET=utf8 Trigger für die Artikeltabelle erstellen CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` VOR DEM UPDATE AUF `article` FÜR JEDE ZEILE BEGINNEN - Die Logik hier besteht darin, das Problem zu lösen, dass, wenn der Artikelstatus N ändert, die entsprechenden Daten in der Suchmaschine gelöscht werden müssen. WENN NEU.status = 'N' DANN in elasticsearch_trash(id) Werte (OLD.id) einfügen; ENDE, WENN; – Die Logik hierbei besteht darin, dass bei einer Änderung des Status auf Y die Artikel-ID weiterhin in der Methode elasticsearch_trash vorhanden ist, was zu einer versehentlichen Löschung führen kann. Daher müssen Sie die Recyclingdatensätze im Papierkorb löschen. WENN NEU.status = 'J' DANN aus elasticsearch_trash löschen, wobei ID = OLD.id; ENDE, WENN; ENDE CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` VOR DEM LÖSCHEN VON `article` FÜR JEDE ZEILE BEGINNEN – Die Logik hierbei besteht darin, dass ein Artikel beim Löschen in den Papierkorb der Suchmaschine verschoben wird. in elasticsearch_trash(id) Werte (OLD.id) einfügen; ENDE Als Nächstes müssen wir eine einfache Shell schreiben, die einmal pro Minute ausgeführt wird, um Daten aus der Datentabelle „elasticsearch_trash“ abzurufen, und dann mit dem Befehl „curl“ die Restful-Schnittstelle von Elasticsearch aufrufen, um die abgerufenen Daten zu löschen. Sie können auch verwandte Programme entwickeln. Hier ist ein Beispiel für eine geplante Spring-Boot-Aufgabe. juristische Person Paket cn.netkiller.api.domain.elasticsearch; importiere java.util.Date; importiere javax.persistence.Column; importiere javax.persistence.Entity; importiere javax.persistence.Id; importiere javax.persistence.Table; @Juristische Person @Tisch öffentliche Klasse ElasticsearchTrash { @Ausweis private int-ID; @Column(columnDefinition = "ZEITSTEMPEL STANDARD AKTUELLER_ZEITSTEMPEL") privates Datum ctime; öffentliche int getId() { Rückgabe-ID; } öffentliche void setId(int id) { diese.id = ID; } öffentliches Datum getCtime() { ctime zurückgeben; } öffentliche void setCtime(Datum ctime) { dies.ctime = ctime; } } Lagerhaus Paket cn.netkiller.api.repository.elasticsearch; importiere org.springframework.data.repository.CrudRepository; importiere com.example.api.domain.elasticsearch.ElasticsearchTrash; öffentliche Schnittstelle ElasticsearchTrashRepository erweitert CrudRepository<ElasticsearchTrash, Integer>{ } Geplante Aufgaben Paket cn.netkiller.api.schedule; importiere org.elasticsearch.action.delete.DeleteResponse; importiere org.elasticsearch.client.transport.TransportClient; importiere org.elasticsearch.rest.RestStatus; importiere org.slf4j.Logger; importiere org.slf4j.LoggerFactory; importiere org.springframework.beans.factory.annotation.Autowired; importiere org.springframework.scheduling.annotation.Scheduled; importiere org.springframework.stereotype.Component; importiere com.example.api.domain.elasticsearch.ElasticsearchTrash; importiere com.example.api.repository.elasticsearch.ElasticsearchTrashRepository; @Komponente öffentliche Klasse ScheduledTasks { privater statischer finaler Logger-Logger = LoggerFactory.getLogger (ScheduledTasks.class); @Autowired privater TransportClient-Client; @Autowired privates ElasticsearchTrashRepository alasticsearchTrashRepository; öffentliche ScheduledTasks() { } @Scheduled(fixedRate = 1000 * 60) // Führe die geplante Aufgabe alle 60 Sekunden aus public void cleanTrash() { für (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) { DeleteResponse-Antwort = Client.prepareDelete("Information", "Artikel", elasticsearchTrash.getId() + "").get(); RestStatus status = antwort.status(); logger.info("löschen {} {}", elasticsearchTrash.getId(), status.toString()); wenn (status == RestStatus.OK || status == RestStatus.NICHT_GEFUNDEN) { alasticsearchTrashRepository.delete(elasticsearchTrash); } } } } Spring Boot startet das Hauptprogramm. Paket cn.netkiller.api; importiere org.springframework.boot.SpringApplication; importiere org.springframework.boot.autoconfigure.SpringBootApplication; importiere org.springframework.scheduling.annotation.EnableScheduling; @SpringBootAnwendung @EnableScheduling öffentliche Klasse Anwendung { öffentliche statische void main(String[] args) { SpringApplication.run(Anwendung.Klasse, Argumente); } } Oben finden Sie eine Erklärung zur Lösung des Datenasymmetrieproblems zwischen MySQL und Elasticsearch. Wenn Sie Fragen haben, hinterlassen Sie bitte eine Nachricht oder diskutieren Sie in der Community dieser Site. Vielen Dank fürs Lesen und ich hoffe, es kann Ihnen helfen. Vielen Dank für Ihre Unterstützung dieser Site! Das könnte Sie auch interessieren:
|
<<: Versuchen Sie Docker+Nginx, um die Single-Page-Anwendungsmethode bereitzustellen
>>: Anwendungsbeispiele für die try_files-Direktive von Nginx
Verzeichnisstruktur . │ .env │ docker-compose.yml...
1. Entdecken Sie das Problem © ist das Co...
01. Befehlsübersicht Der Befehl tr kann Zeichen a...
Das Erstellen eines Images ist ein sehr wichtiger...
Inhaltsverzeichnis Vorwort 1. Technisches Prinzip...
Inhaltsverzeichnis 1. Komponente 2. Keep-Alive-Mo...
Inhaltsverzeichnis Installieren Sie CentOS7 Konfi...
Lernen Sie jeden Tag ein schwebendes jQuery-Plug-...
Inhaltsverzeichnis Vorwort Einführung in QueryCac...
In diesem Artikel finden Sie den spezifischen Cod...
In der Einleitung steht: Absolute sagte: „Relativ...
In diesem Artikel wird der spezifische Code des J...
Das Prinzip besteht darin, zuerst ein Div mit ein...
Überprüfen Sie die Transaktionsisolationsebene In...
Basierend auf Theorien wie Saussures Sprachphilos...