Vorwort Wir müssen häufig etwas basierend auf bestimmten Vorgängen tun, die Benutzer mit ihren eigenen Daten durchführen. Wenn ein Benutzer beispielsweise sein Konto löscht, senden wir ihm eine Textnachricht, um ihn zu tadeln und ihn anzuflehen, zurückzukommen. Eine ähnliche Funktion kann sicherlich in der Geschäftslogikschicht implementiert werden, und dieser Vorgang kann nach Erhalt der Löschanforderung des Benutzers ausgeführt werden, aber das Datenbank-Binlog bietet uns eine andere Vorgangsmethode. Um das Binärprotokoll zu überwachen, sind zwei Schritte erforderlich. Der erste Schritt besteht natürlich darin, diese Funktion in Ihrem MySQL zu aktivieren, und der zweite Schritt besteht darin, ein Programm zum Lesen des Protokolls zu schreiben. MySQL aktiviert Binlog. Zunächst einmal ist das Binärprotokoll von MySQL normalerweise nicht geöffnet. Daher benötigen wir: Suchen Sie die MySQL-Konfigurationsdatei my.cnf. Der Speicherort kann je nach Betriebssystem unterschiedlich sein. Sie können es selbst finden. Fügen Sie den folgenden Inhalt hinzu: [mysqld] Server-ID = 1 log-bin = mysql-bin Binlog-Format = ROW Starten Sie dann MySQL neu. /ubuntu Dienst MySQL Neustart // Mac mysql.server neu starten Überwachen Sie, ob die Aktivierung erfolgreich war Geben Sie die MySQL-Befehlszeile ein und führen Sie Folgendes aus: Variablen wie „%log_bin%“ anzeigen; Wenn das Ergebnis wie unten dargestellt aussieht, war der Vorgang erfolgreich: Zeigen Sie den Status des geschriebenen Binärprotokolls an: Code zum Lesen des Binärprotokolls Einführung von Abhängigkeiten Wir verwenden einige Open-Source-Implementierungen. Aus seltsamen Gründen habe ich das Paket mysql-binlog-connector-java (offizielles GitHub-Repository) ausgewählt [github.com/shyiko/mysq…]. Die spezifischen Abhängigkeiten sind wie folgt: <!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java --> <Abhängigkeit> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.17.0</version> </Abhängigkeit> Natürlich gibt es viele Open-Source-Implementierungen für die Binlog-Verarbeitung. Alibabas „cancl“ ist eine davon und Sie können sie auch verwenden. Schreiben Sie eine Demo Schreiben wir gemäß der Readme-Datei im offiziellen Repository einfach eine Demo. öffentliche statische void main(String[] args) { BinaryLogClient-Client = neuer BinaryLogClient("Hostname", 3306, "Benutzername", "Passwort"); EventDeserializer eventDeserializer = neuer EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); client.setEventDeserializer(eventDeserializer); client.registerEventListener(neuer BinaryLogClient.EventListener() { @Überschreiben öffentliche void beiEreignis(Ereignisereignis) { //ZU TUN tuetwas(); logger.info(Ereignis.toString()); } }); client.verbinden(); } Dies ist vollständig gemäß dem offiziellen Tutorial geschrieben. Sie können Ihre eigene Geschäftslogik in onEvent schreiben. Da ich nur teste, drucke ich jedes darin enthaltene Ereignis aus. Danach habe ich mich manuell bei MySQL angemeldet und die entsprechenden Hinzufügungs-, Änderungs- und Löschvorgänge ausgeführt. Die überwachten Protokolle waren wie folgt:
Erstellen Sie eine bessere und individuellere Werkzeugklasse entsprechend Ihrem eigenen Unternehmen Ich wollte den Code zunächst veröffentlichen, aber als der Code immer länger wurde, beschloss ich, ihn auf GitHub hochzuladen. Hier werde ich nur einen Teil der Implementierung veröffentlichen. Code-Transferportal Umsetzungsideen
Die Umsetzungsideen lauten also in etwa wie folgt:
Initialisierungscode: öffentliche MysqlBinLogListener(Conf conf) { BinaryLogClient-Client = neuer BinaryLogClient (conf.host, conf.port, conf.username, conf.passwd); EventDeserializer eventDeserializer = neuer EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY ); client.setEventDeserializer(eventDeserializer); dies.parseClient = Client; diese.Warteschlange = neue ArrayBlockingQueue<>(1024); diese.conf = conf; Listener = neue ConcurrentHashMap<>(); dbTableCols = neue ConcurrentHashMap<>(); this.consumer = Executors.newFixedThreadPool(consumerThreads); } Registrierungscode: public void regListener(String db, String table, BinLogListener listener) throws Exception { : Zeichenfolge dbTable = getdbTable(db, Tabelle); Klasse.fürName("com.mysql.jdbc.Driver"); // Speichern Sie die Spalteninformationen der aktuell registrierten Tabelle Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd); Map<String, Colum> cols = getColMap(Verbindung, Datenbank, Tabelle); dbTableCols.put(dbTable, cols); //Speichern Sie den aktuell registrierten Listener Liste<BinLogListener> Liste = Listener.getOrDefault(dbTable, neue ArrayList<>()); Liste.Hinzufügen(Listener); listeners.put(dbTable, Liste); } In diesem Schritt erhalten wir die Schemainformationen der Tabelle, während wir den Listener registrieren, und speichern sie in der Karte, um die nachfolgende Datenverarbeitung zu erleichtern. Abhörcode: @Überschreiben öffentliche void beiEreignis(Ereignisereignis) { Ereignistyp Ereignistyp = Ereignis.getHeader().getEventType(); wenn (Ereignistyp == Ereignistyp.TABLE_MAP) { TableMapEventData tableData = event.getData(); : String db = tableData.getDatabase(); Zeichenfolgetabelle = tableData.getTable(); dbTable = getdbTable(db, Tabelle); } // Behandeln Sie nur die drei Vorgänge Hinzufügen, Löschen und Aktualisieren, wenn (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) { wenn (istSchreiben(Ereignistyp)) { WriteRowsEventData-Daten = event.getData(); für (Serializable[] Zeile: data.getRows()) { wenn (dbTableCols.containsKey(dbTable)) { LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable)); e.setDbTable(dbTable); Warteschlange hinzufügen(e); } } } } } Ich bin faul,,, hier implementiere ich nur die Verarbeitung der Add-Operation und habe keine anderen Operationen geschrieben. Verbrauchscode: public void parse() wirft IOException { parseClient.registerEventListener(dies); für (int i = 0; i < consumerThreads; i++) { Verbraucher.submit(() -> { während (wahr) { if (queue.size() > 0) { versuchen { LogItem-Element = Warteschlange.take(); : Zeichenfolge dbtable = item.getDbTable(); listeners.get(dbtable).forEach(l -> { l.beiEreignis(Element); }); } Fang (UnterbrocheneAusnahme e) { e.printStackTrace(); } } Thread.sleep(1000); } }); } parseClient.connect(); } Holen Sie sich beim Verbrauch das Element aus der Warteschlange und lassen Sie dann den entsprechenden Listener bzw. die entsprechenden Listener das Element entsprechend verwenden. Testcode: öffentliche statische void main(String[] args) wirft Exception { Conf conf = neue Conf(); conf.host = "Hostname"; conf.port = 3306; conf.Benutzername = conf.Passwort = "hhsgsb"; MysqlBinLogListener mysqlBinLogListener = neuer MysqlBinLogListener(conf); mysqlBinLogListener.parseArgsAndRun(args); mysqlBinLogListener.regListener("pf", "student", item -> { System.out.println(neuer String((byte[])item.getAfter().get("name"))); logger.info("einfügen in {}, Wert = {}", item.getDbTable(), item.getAfter()); }); mysqlBinLogListener.regListener("pf", "Lehrer", Element -> System.out.println("Lehrer ====")); mysqlBinLogListener.parse(); } In diesem kurzen Code werden zwei Listener registriert, die jeweils die Schüler- und Lehrertabellen abhören und ausdrucken. Nach dem Testen kann die definierte Geschäftslogik beim Einfügen von Daten in die Lehrertabelle unabhängig ausgeführt werden. Hinweis: Die Tool-Klasse kann hier nicht direkt verwendet werden, da viele Ausnahmebehandlungen nicht durchgeführt werden und die Funktion nur auf die Einfügeanweisung hört, die als Referenz für die Implementierung verwendet werden kann. Verweise
Zusammenfassen Das Obige ist der vollständige Inhalt dieses Artikels. Ich hoffe, dass der Inhalt dieses Artikels einen gewissen Lernwert für Ihr Studium oder Ihre Arbeit hat. Vielen Dank für Ihre Unterstützung von 123WORDPRESS.COM. Das könnte Sie auch interessieren:
|
<<: vitrualBox+ubuntu16.04 installieren Sie Python3.6, neuestes Tutorial und detaillierte Schritte
Es gibt bereits viele Artikel über Slot-Scope auf...
MySQL selbst unterstützt keine rekursive Syntax, ...
Vorwort Wenn wir bestimmte Zeilen in einer Datei ...
1. Erstellen einfügen in [Tabellenname] (Feld1, F...
Inhaltsverzeichnis 1. Bootstrap5-Haltepunkte 1.1 ...
Inhaltsverzeichnis 1.parseInt(Zeichenfolge, Basis...
Inhaltsverzeichnis Vorwort InnoDB-Speicherarchite...
MySQL ist ein relationales Datenbankverwaltungssy...
Docker virtualisiert eine Brücke auf dem Host-Rec...
Heute habe ich von Alibaba Cloud eine Festplatten...
Wenn Sie MySQL 5.7 verwenden, werden Sie feststel...
Vorwort Als mein Team das Steuersystemmodul entwi...
Was ist ein Speicherleck? Ein Speicherleck bedeut...
1. Quellcode entwerfen Code kopieren Der Code laut...
Inhaltsverzeichnis FileReader liest lokale Dateie...