1. Ursprüngliche NachfrageEs ist erforderlich, die ursprünglichen vollständigen Daten sowie die inkrementellen Daten bestimmter Tabellen in bestimmten MySQL-Bibliotheken in Echtzeit zu synchronisieren, und die entsprechenden Änderungen und Löschungen müssen ebenfalls synchronisiert werden. Die Datensynchronisierung darf nicht aufdringlich sein: Geschäftsabläufe dürfen nicht geändert werden und auf der Geschäftsseite darf kein zu großer Leistungsdruck entstehen. Anwendungsszenarien: ETL-Datensynchronisierung und Reduzierung des Drucks auf Unternehmensserver. 2. Lösung 3. Kanaleinführung und InstallationCanal ist ein Open-Source-Projekt von Alibaba, das in reinem Java entwickelt wurde. Basierend auf der inkrementellen Protokollanalyse der Datenbank ermöglicht es inkrementelles Datenabonnement und -verbrauch und unterstützt derzeit hauptsächlich MySQL (unterstützt auch MariaDB). Funktionsprinzip: Implementierung der MySQL-Master-Slave-Replikation Aus allgemeiner Sicht ist die Replikation in drei Schritte unterteilt:
Wie der Kanal funktioniertDas Prinzip ist relativ einfach:
Architekturveranschaulichen:
Instanzmodul:
Installieren1. Vorbereitung der MySQL- und Kafka-Umgebung 2. Laden Sie den Kanal herunter: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz 3. Entpacken: tar -zxvf canal.deployer-1.1.3.tar.gz 4. Konfigurieren Sie die Dateiparameter im Verzeichnis conf Konfigurieren Sie canal.properties: Geben Sie conf/example ein und konfigurieren Sie instance.properties: 5. Starten Sie: bin/startup.sh 6. Protokollanzeige: 4. Überprüfung1. Entwickeln Sie den entsprechenden Kafka-Consumer Paket org.kafka; importiere java.util.Arrays; importiere java.util.Properties; importiere org.apache.kafka.clients.consumer.ConsumerRecord; importiere org.apache.kafka.clients.consumer.ConsumerRecords; importiere org.apache.kafka.clients.consumer.KafkaConsumer; importiere org.apache.kafka.common.serialization.StringDeserializer; /** * * Titel: KafkaConsumerTest * Beschreibung: * Kafka-Consumer-Demo * Version: 1.0.0 * @Autor pancm * @date 26. Januar 2018 */ öffentliche Klasse KafkaConsumerTest implementiert Runnable { privater finaler KafkaConsumer<String, String>-Verbraucher; private ConsumerRecords<String, String> msgList; privates endgültiges String-Thema; private statische endgültige Zeichenfolge GROUPID = "GruppeA"; öffentlicher KafkaConsumerTest(String topicName) { Eigenschaften-Eigenschaften = neue Eigenschaften(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", GRUPPENRID); props.put("aktivieren.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("Sitzung.Timeout.ms", "30000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("Wert.deserializer", StringDeserializer.class.getName()); this.consumer = neuer KafkaConsumer<String, String>(Eigenschaften); dieses.Thema = Themenname; this.consumer.subscribe(Arrays.asList(Thema)); } @Überschreiben öffentliche Leere ausführen() { int Nachrichtennummer = 1; System.out.println("---------Verbrauch starten---------"); versuchen { für (; ; ) { msgList = Verbraucher.Umfrage(1000); if (null != msgList && msgList.count() > 0) { für (ConsumerRecord<String, String> Datensatz : msgList) { //Drucken, nachdem 100 Datensätze verbraucht wurden, aber die gedruckten Daten folgen möglicherweise nicht diesem Muster System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // Zeichenfolge v = decodeUnicode(Datensatz.Wert()); // System.out.println(v); // Beenden, wenn 1000 Nachrichten verbraucht sind, if (messageNo % 1000 == 0) { brechen; } NachrichtNr++; } } anders { Thread.sleep(11); } } } Fang (UnterbrocheneAusnahme e) { e.printStackTrace(); Endlich Verbraucher.schließen(); } } öffentliche statische void main(String args[]) { KafkaConsumerTest test1 = neuer KafkaConsumerTest("Beispieldaten"); Thread thread1 = neuer Thread(test1); thread1.start(); } /* * Chinesisch in Unicode konvertieren*/ öffentliche statische Zeichenfolge gbEncoding(finale Zeichenfolge gbString) { char[] utfBytes = gbString.toCharArray(); Zeichenfolge unicodeBytes = ""; für (int i = 0; i < utfBytes.Länge; i++) { String hexB = Integer.toHexString(utfBytes[i]); wenn (hexB.Länge() <= 2) { hexB = "00" + hexB; } unicodeBytes = unicodeBytes + "\\u" + hexB; } UnicodeBytes zurückgeben; } /* * Unicode-Kodierung ins Chinesische*/ öffentlicher statischer String decodeUnicode(finaler String dataStr) { Geben Sie den Startwert ein. int Ende = 0; endgültiger StringBuffer-Puffer = neuer StringBuffer(); während (Start > -1) { Ende = dataStr.indexOf("\\u", Start + 2); Zeichenfolge charStr = ""; wenn (Ende == -1) { charStr = dataStr.substring(start + 2, dataStr.length()); } anders { charStr = dataStr.substring(start + 2, end); } char letter = (char) Integer.parseInt(charStr, 16); // Hexadezimale Ganzzahlzeichenfolge analysieren. Puffer.anhängen(neues Zeichen(Buchstabe).toString()); Anfang = Ende; } gibt buffer.toString() zurück; } } 2. Daten zur Tabelle bak1 hinzufügen TABELLE ERSTELLEN `bak1` ( `vin` varchar(20) NICHT NULL, `p1` doppelter STANDARD NULL, `p2` doppelter STANDARD NULL, `p3` doppelter STANDARD NULL, `p4` doppelter DEFAULT NULL, `p5` doppelter DEFAULT NULL, `p6` doppelter DEFAULT NULL, `p7` doppelter DEFAULT NULL, `p8` doppelter DEFAULT NULL, `p9` doppelter DEFAULT NULL, `p0` doppelter STANDARD NULL ) ENGINE=InnoDB STANDARD-CHARSET=utf8mb4 zeige, erstelle Tabelle bak1; in bak1 einfügen, wähle '李雷abcv', `p1`, `p2`, `p3`, `p4`, `p5`, `p6`, `p7`, `p8`, `p9`, `p0` von Moci-Grenze 10 3. Zeigen Sie die Ausgabeergebnisse an: Damit ist dieser Artikel über die Synchronisierung der vollständigen und inkrementellen Daten einer bestimmten MySQL-Tabelle mit einer Nachrichtenwarteschlange abgeschlossen – Lösung. Weitere Informationen zur Synchronisierung von Daten in einer bestimmten MySQL-Tabelle finden Sie in früheren Artikeln auf 123WORDPRESS.COM oder in den folgenden verwandten Artikeln. Ich hoffe, Sie werden 123WORDPRESS.COM auch in Zukunft unterstützen! Das könnte Sie auch interessieren:
|
<<: Eine detaillierte Einführung zum Einrichten von Jenkins auf Tencent Cloud Server
>>: HTML-Code zum Hinzufügen eines Mengenabzeichens zur Nachrichtenschaltfläche
In Fortsetzung des vorherigen Artikels erstellen ...
Shell-Skript #!/bin/sh # Aktuelles Verzeichnis CU...
Vorwort In MySQL werden datenbankübergreifende Ab...
Mit der Verbreitung von 3G nutzen immer mehr Mens...
In diesem Artikel wird der spezifische Code von j...
Um eine Tabelle in HTML zu zeichnen, verwenden Si...
Inhaltsverzeichnis 1. So finden Sie doppelte Zeil...
veranschaulichen Bei einer Eigeninstallation des ...
Installieren Sie pymysql pip install pymysql 2|0V...
Installationsvorschlag : Versuchen Sie, für die I...
Was ist sie? „em“ bezieht sich auf die Schrifthöhe...
Der CSS-Implementierungscode zum Festlegen des Bi...
In diesem Artikelbeispiel wird der spezifische Co...
Inhaltsverzeichnis 1. Die Rolle der Nginx-Prozess...
Nachdem Sie Docker auf dem Linux-Server installie...