Die vollständigen und inkrementellen Daten einer bestimmten MySQL-Tabelle mit der Nachrichtenwarteschlange synchronisieren - Lösung

Die vollständigen und inkrementellen Daten einer bestimmten MySQL-Tabelle mit der Nachrichtenwarteschlange synchronisieren - Lösung

1. Ursprüngliche Nachfrage

Es ist erforderlich, die ursprünglichen vollständigen Daten sowie die inkrementellen Daten bestimmter Tabellen in bestimmten MySQL-Bibliotheken in Echtzeit zu synchronisieren, und die entsprechenden Änderungen und Löschungen müssen ebenfalls synchronisiert werden.

Die Datensynchronisierung darf nicht aufdringlich sein: Geschäftsabläufe dürfen nicht geändert werden und auf der Geschäftsseite darf kein zu großer Leistungsdruck entstehen.

Anwendungsszenarien: ETL-Datensynchronisierung und Reduzierung des Drucks auf Unternehmensserver.

2. Lösung

3. Kanaleinführung und Installation

Canal ist ein Open-Source-Projekt von Alibaba, das in reinem Java entwickelt wurde. Basierend auf der inkrementellen Protokollanalyse der Datenbank ermöglicht es inkrementelles Datenabonnement und -verbrauch und unterstützt derzeit hauptsächlich MySQL (unterstützt auch MariaDB).

Funktionsprinzip: Implementierung der MySQL-Master-Slave-Replikation

Aus allgemeiner Sicht ist die Replikation in drei Schritte unterteilt:

  1. Der Master zeichnet die Änderungen im Binärprotokoll auf (diese Aufzeichnungen werden als Binärprotokollereignisse bezeichnet und können mit „show binlog events“ angezeigt werden).
  2. Der Slave kopiert die Binärprotokollereignisse des Masters in sein Relay-Protokoll.
  3. Der Slave wiederholt die Ereignisse im Relay-Protokoll und ändert die Daten, sodass sie seine eigenen widerspiegeln.

Wie der Kanal funktioniert

Das Prinzip ist relativ einfach:

  1. Canal simuliert das interaktive Protokoll des MySQL-Slaves, gibt sich als MySQL-Slave aus und sendet das Dump-Protokoll an den MySQL-Master
  2. Der MySQL-Master empfängt die Dump-Anforderung und beginnt mit dem Weiterleiten des Binärprotokolls an den Slave (Kanal).
  3. Canal analysiert binäre Log-Objekte (ursprünglich Byte-Streams)

Architektur

veranschaulichen:

  • Server stellt eine Kanallaufinstanz dar, die einer JVM entspricht
  • Eine Instanz entspricht einer Datenwarteschlange (ein Server entspricht 1..n Instanzen)

Instanzmodul:

  • eventParser (Datenquellenzugriff, Simulation des Slave-Protokolls und der Master-Interaktion, Protokollanalyse)
  • eventSink (Parser- und Store-Connector, führt Datenfilterung, -verarbeitung und -verteilung durch)
  • eventStore (Datenspeicher)
  • metaManager (inkrementeller Abonnement- und Verbrauchsinformationsmanager)

Installieren

1. Vorbereitung der MySQL- und Kafka-Umgebung

2. Laden Sie den Kanal herunter: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3. Entpacken: tar -zxvf canal.deployer-1.1.3.tar.gz

4. Konfigurieren Sie die Dateiparameter im Verzeichnis conf

Konfigurieren Sie canal.properties:

Geben Sie conf/example ein und konfigurieren Sie instance.properties:

5. Starten Sie: bin/startup.sh

6. Protokollanzeige:

4. Überprüfung

1. Entwickeln Sie den entsprechenden Kafka-Consumer

Paket org.kafka;

importiere java.util.Arrays;
importiere java.util.Properties;
importiere org.apache.kafka.clients.consumer.ConsumerRecord;
importiere org.apache.kafka.clients.consumer.ConsumerRecords;
importiere org.apache.kafka.clients.consumer.KafkaConsumer;
importiere org.apache.kafka.common.serialization.StringDeserializer;


/**
 *
 * Titel: KafkaConsumerTest
 * Beschreibung:
 * Kafka-Consumer-Demo
 * Version: 1.0.0
 * @Autor pancm
 * @date 26. Januar 2018 */
öffentliche Klasse KafkaConsumerTest implementiert Runnable {

    privater finaler KafkaConsumer<String, String>-Verbraucher;
    private ConsumerRecords<String, String> msgList;
    privates endgültiges String-Thema;
    private statische endgültige Zeichenfolge GROUPID = "GruppeA";

    öffentlicher KafkaConsumerTest(String topicName) {
        Eigenschaften-Eigenschaften = neue Eigenschaften();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", GRUPPENRID);
        props.put("aktivieren.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("Sitzung.Timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("Wert.deserializer", StringDeserializer.class.getName());
        this.consumer = neuer KafkaConsumer<String, String>(Eigenschaften);
        dieses.Thema = Themenname;
        this.consumer.subscribe(Arrays.asList(Thema));
    }

    @Überschreiben
    öffentliche Leere ausführen() {
        int Nachrichtennummer = 1;
        System.out.println("---------Verbrauch starten---------");
        versuchen {
            für (; ; ) {
                msgList = Verbraucher.Umfrage(1000);
                if (null != msgList && msgList.count() > 0) {
                    für (ConsumerRecord<String, String> Datensatz : msgList) {
                        //Drucken, nachdem 100 Datensätze verbraucht wurden, aber die gedruckten Daten folgen möglicherweise nicht diesem Muster System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());


// Zeichenfolge v = decodeUnicode(Datensatz.Wert());

// System.out.println(v);

                        // Beenden, wenn 1000 Nachrichten verbraucht sind, if (messageNo % 1000 == 0) {
                            brechen;
                        }
                        NachrichtNr++;
                    }
                } anders {
                    Thread.sleep(11);
                }
            }
        } Fang (UnterbrocheneAusnahme e) {
            e.printStackTrace();
        Endlich
            Verbraucher.schließen();
        }
    }

    öffentliche statische void main(String args[]) {
        KafkaConsumerTest test1 = neuer KafkaConsumerTest("Beispieldaten");
        Thread thread1 = neuer Thread(test1);
        thread1.start();
    }


    /*
     * Chinesisch in Unicode konvertieren*/
    öffentliche statische Zeichenfolge gbEncoding(finale Zeichenfolge gbString) {
        char[] utfBytes = gbString.toCharArray();
        Zeichenfolge unicodeBytes = "";
        für (int i = 0; i < utfBytes.Länge; i++) {
            String hexB = Integer.toHexString(utfBytes[i]);
            wenn (hexB.Länge() <= 2) {
                hexB = "00" + hexB;
            }
            unicodeBytes = unicodeBytes + "\\u" + hexB;
        }
        UnicodeBytes zurückgeben;
    }

    /*
     * Unicode-Kodierung ins Chinesische*/
    öffentlicher statischer String decodeUnicode(finaler String dataStr) {
        Geben Sie den Startwert ein.
        int Ende = 0;
        endgültiger StringBuffer-Puffer = neuer StringBuffer();
        während (Start > -1) {
            Ende = dataStr.indexOf("\\u", Start + 2);
            Zeichenfolge charStr = "";
            wenn (Ende == -1) {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } anders {
                charStr = dataStr.substring(start + 2, end);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // Hexadezimale Ganzzahlzeichenfolge analysieren.
            Puffer.anhängen(neues Zeichen(Buchstabe).toString());
            Anfang = Ende;
        }
        gibt buffer.toString() zurück;

    }
}

2. Daten zur Tabelle bak1 hinzufügen

TABELLE ERSTELLEN `bak1` (
  `vin` varchar(20) NICHT NULL,
  `p1` doppelter STANDARD NULL,
  `p2` doppelter STANDARD NULL,
  `p3` doppelter STANDARD NULL,
  `p4` doppelter DEFAULT NULL,
  `p5` doppelter DEFAULT NULL,
  `p6` doppelter DEFAULT NULL,
  `p7` doppelter DEFAULT NULL,
  `p8` doppelter DEFAULT NULL,
  `p9` doppelter DEFAULT NULL,
  `p0` doppelter STANDARD NULL
) ENGINE=InnoDB STANDARD-CHARSET=utf8mb4

zeige, erstelle Tabelle bak1;

in bak1 einfügen, wähle '李雷abcv',
  `p1`,
  `p2`,
  `p3`,
  `p4`,
  `p5`,
  `p6`,
  `p7`,
  `p8`,
  `p9`,
  `p0` von Moci-Grenze 10

3. Zeigen Sie die Ausgabeergebnisse an:

Damit ist dieser Artikel über die Synchronisierung der vollständigen und inkrementellen Daten einer bestimmten MySQL-Tabelle mit einer Nachrichtenwarteschlange abgeschlossen – Lösung. Weitere Informationen zur Synchronisierung von Daten in einer bestimmten MySQL-Tabelle finden Sie in früheren Artikeln auf 123WORDPRESS.COM oder in den folgenden verwandten Artikeln. Ich hoffe, Sie werden 123WORDPRESS.COM auch in Zukunft unterstützen!

Das könnte Sie auch interessieren:
  • Detaillierte Erklärung zur Synchronisierung von Daten von MySQL mit Elasticsearch
  • Tutorial zum Synchronisieren von MySQL-Daten mit ElasticSearch mithilfe von Python
  • Schritte zum Synchronisieren von MongoDB-Daten mit MySQL mithilfe von node.js
  • MySQL5.6 Master-Slave-Replikation (MySQL-Datensynchronisierungskonfiguration)
  • Detaillierte Erläuterung zur Reduzierung der MySQL Master-Slave-Datensynchronisationsverzögerung
  • MySQL-Trigger zum Synchronisieren von Daten zwischen zwei Tabellen
  • Zusammenfassung der Lösungen für das Problem Slave_IO_Running:No bei der MySQL-Datensynchronisierung
  • Methode zur Synchronisierung von MySQL-Sicherungs- und Migrationsdaten
  • Konfigurationsmethode für die MYSQL5-Masterslave-Datensynchronisation
  • So synchronisieren Sie Mysql-Daten

<<:  Eine detaillierte Einführung zum Einrichten von Jenkins auf Tencent Cloud Server

>>:  HTML-Code zum Hinzufügen eines Mengenabzeichens zur Nachrichtenschaltfläche

Artikel empfehlen

Detaillierte Analyse des Linux-NFS-Mechanismus anhand von Fällen

In Fortsetzung des vorherigen Artikels erstellen ...

Beispiel einer datenbankübergreifenden Abfrage in MySQL

Vorwort In MySQL werden datenbankübergreifende Ab...

Natives JS zur Realisierung eines einfachen Schlangenspiels

In diesem Artikel wird der spezifische Code von j...

HTML-Tabelle_Powernode Java Academy

Um eine Tabelle in HTML zu zeichnen, verwenden Si...

So finden und löschen Sie doppelte Zeilen in MySQL

Inhaltsverzeichnis 1. So finden Sie doppelte Zeil...

So ändern Sie die Systemsprache von CentOS7 in vereinfachtes Chinesisch

veranschaulichen Bei einer Eigeninstallation des ...

MySQL 8.0.21-Installationstutorial unter Windows-System (Abbildung und Text)

Installationsvorschlag : Versuchen Sie, für die I...

Was ist em? Einführung und Konvertierungsmethode von em und px

Was ist sie? „em“ bezieht sich auf die Schrifthöhe...

CSS-Beispielcode zum Festlegen des Bildlaufleistenstils

Der CSS-Implementierungscode zum Festlegen des Bi...

Detaillierte Erläuterung der Implementierung der Nginx-Prozesssperre

Inhaltsverzeichnis 1. Die Rolle der Nginx-Prozess...