Detaillierte Erläuterung der 4 gängigen Datenquellen in Spark SQL

Detaillierte Erläuterung der 4 gängigen Datenquellen in Spark SQL

Allgemeine Lade-/Schreibmethoden

Optionen manuell angeben

Die DataFrame-Schnittstelle von Spark SQL unterstützt Vorgänge an mehreren Datenquellen. Ein DataFrame kann auf die gleiche Weise wie RDDs bearbeitet werden und kann auch als temporäre Tabelle registriert werden. Nachdem Sie den DataFrame als temporäre Tabelle registriert haben, können Sie SQL-Abfragen auf dem DataFrame ausführen.

Die Standarddatenquelle für Spark SQL ist im Parquet-Format. Wenn die Datenquelle eine Parquet-Datei ist, kann Spark SQL alle Vorgänge problemlos ausführen.

Ändern Sie das Konfigurationselement spark.sql.sources.default, um das Standarddatenquellenformat zu ändern.

scala> val df = spark.read.load("hdfs://hadoop001:9000/namesAndAges.parquet")
df: org.apache.spark.sql.DataFrame = [Alter: Bigint, Name: Zeichenfolge]
scala> df.select("Name").schreiben.speichern("Namen.Parkett")

Wenn das Datenquellenformat keine Parquet-Datei ist, müssen Sie das Datenquellenformat manuell angeben. Das Datenquellenformat muss den vollständigen Namen angeben (z. B. org.apache.spark.sql.parquet). Wenn das Datenquellenformat ein integriertes Format ist, müssen Sie nur die Abkürzungen json, parquet, jdbc, orc, libsvm, csv, text angeben, um das Datenformat anzugeben.

Sie können die von SparkSession bereitgestellte Methode read.load zum allgemeinen Laden von Daten verwenden und write und save zum Speichern von Daten verwenden.

scala> val peopleDF = spark.read.format("json").load("hdfs://hadoop001:9000/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [Alter: Bigint, Name: Zeichenfolge]
scala> peopleDF.write.format("parquet").speichern("hdfs://hadoop001:9000/namesAndAges.parquet")
scala>

Darüber hinaus können Sie SQL direkt auf der Datei ausführen:

val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://hadoop001:9000/namesAndAges.parquet`")
sqlDF.show()

Optionen zum Speichern von Dateien

Mit SaveMode können Speichervorgänge ausgeführt werden. SaveMode definiert den Datenverarbeitungsmodus. Es ist wichtig zu beachten, dass diese Speichermodi keine Sperren verwenden und keine atomaren Operationen sind. Darüber hinaus werden bei Verwendung der Overwrite-Methode die Originaldaten gelöscht, bevor die neuen Daten ausgegeben werden. Der SaveMode wird in der folgenden Tabelle detailliert beschrieben:

Scala/Java Jede Sprache Bedeutung
SaveMode.ErrorIfExists (Standard) "Fehler" (Standard) Wenn die Datei existiert, wird ein Fehler gemeldet.
SaveMode.Anhängen "anhängen" Anhängen
SaveMode.Überschreiben "überschreiben" Überschreiben
SaveMode.Ignore "ignorieren" Wenn Daten vorhanden sind, ignorieren Sie sie

Parquet-Dateien

Parquet lesen und schreiben

Das Parquet-Format wird häufig im Hadoop-Ökosystem verwendet und unterstützt auch alle Datentypen von Spark SQL. Spark SQL bietet Methoden zum direkten Lesen und Speichern von Dateien im Parquet-Format.

// Encoder für die gängigsten Typen werden automatisch durch den Import von spark.implicits bereitgestellt._
importiere spark.implicits._
val peopleDF = spark.read.json("Beispiele/src/main/Ressourcen/people.json")
// DataFrames können als Parquet-Dateien gespeichert werden, wobei die Schemainformationen erhalten bleiben
LeuteDF.write.parquet("hdfs://hadoop001:9000/leute.parquet")
// Lesen Sie die oben erstellte Parquet-Datei ein
// Parquet-Dateien sind selbstbeschreibend, sodass das Schema erhalten bleibt
// Das Ergebnis des Ladens einer Parquet-Datei ist auch ein DataFrame
val parquetFileDF = spark.read.parquet("hdfs://hadoop001:9000/people.parquet")
// Parquet-Dateien können auch zum Erstellen einer temporären Ansicht verwendet und dann in SQL-Anweisungen verwendet werden
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attribute(0)).show()
// +------------+
// | Wert|
// +------------+
// |Name: Justin|
// +------------+

Partitionsinformationen analysieren

Das Partitionieren einer Tabelle ist eine Möglichkeit zur Datenoptimierung. In einer partitionierten Tabelle werden Daten mithilfe von Partitionsspalten in verschiedenen Verzeichnissen gespeichert. Parquet-Datenquellen können jetzt Partitionsinformationen automatisch erkennen und auflösen. Um beispielsweise Bevölkerungsdaten in die Spalten „Geschlecht“ und „Land“ zu unterteilen, verwenden Sie die folgende Verzeichnisstruktur:

Weg
└── zu
└── Tabelle
├── Geschlecht=männlich
│ ├── …
│ │
│ ├── Land = USA
│ │ └── Daten.parquet
│ ├── Land=CN
│ │ └── Daten.parquet
│ └── …
└── Geschlecht=weiblich
├── ...
│
├── Land = USA
│ └── Daten.parquet
├── Land=CN
│ └── Daten.parquet
└── …

Durch Übergeben von Pfad/zur/Tabelle an SQLContext.read.parque

oder SQLContext.read.load, Spark SQL löst die Partitionsinformationen automatisch auf.

Das Schema des zurückgegebenen DataFrame ist wie folgt:

Wurzel
|-- Name: Zeichenfolge (nullwertfähig = wahr)
|-- Alter: lang (nullwertig = wahr)
|-- Geschlecht: Zeichenfolge (nullwertig = wahr)
|-- Land: Zeichenfolge (nullwertfähig = wahr)

Es ist zu beachten, dass der Datentyp der Partitionsspalte der Daten automatisch analysiert wird. Derzeit werden numerische und Zeichenfolgentypen unterstützt. Die Parameter zum automatischen Parsen von Partitionstypen sind:

spark.sql.sources.partitionColumnTypeInference.enabled

Der Standardwert ist „true“.

Wenn Sie diese Funktion deaktivieren möchten, setzen Sie diesen Parameter einfach auf „deaktiviert“. Zu diesem Zeitpunkt wird das Datenformat der Partitionsspalte standardmäßig auf den Zeichenfolgentyp eingestellt und es wird keine Typanalyse mehr durchgeführt.

Schemazusammenführung

Wie ProtocolBuffer, Avro und Thrift unterstützt auch Parquet die Schemaentwicklung. Benutzer können zunächst ein einfaches Schema definieren und dann nach und nach Spaltenbeschreibungen zum Schema hinzufügen. Auf diese Weise können Benutzer mehrere Parquet-Dateien mit unterschiedlichen Schemata erhalten, die jedoch miteinander kompatibel sind. Die Parquet-Datenquelle erkennt diese Situation jetzt automatisch und führt die Schemas dieser Dateien zusammen.

Da das Zusammenführen von Schemas ein kostspieliger Vorgang ist und in den meisten Fällen nicht erforderlich ist, ist diese Funktion in Spark SQL seit 1.5.0 standardmäßig deaktiviert. Sie können diese Funktion auf zwei Arten aktivieren:

Wenn die Datenquelle eine Parquet-Datei ist, setzen Sie die Datenquellenoption „mergeSchema“ auf „true“.

So legen Sie globale SQL-Optionen fest:

spark.sql.parquet.mergeSchema ist wahr.

// In diesem Beispiel wird sqlContext aus dem vorherigen Beispiel verwendet.
// Dies wird verwendet, um ein RDD implizit in einen DataFrame zu konvertieren.
importiere spark.implicits._
//Erstellen Sie einen einfachen DataFrame, der in einem Partitionsverzeichnis gespeichert wird
val df1 = sc.makeRDD(1 bis 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("hdfs://hadoop001:9000/data/test_table/key=1")
// Einen weiteren DataFrame in einem neuen Partitionsverzeichnis erstellen,
// Hinzufügen einer neuen Spalte und Löschen einer vorhandenen Spalte
val df2 = sc.makeRDD(6 bis 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("hdfs://hadoop001:9000/data/test_table/key=2")
// Lesen Sie die partitionierte Tabelle
val df3 = spark.read.option("mergeSchema", "true").parquet("hdfs://hadoop001:9000/data/test_table")
df3.printSchema()
// Das endgültige Schema besteht aus allen 3 Spalten in den Parquet-Dateien zusammen
// mit der Partitionierungsspalte, die in den Partitionsverzeichnispfaden angezeigt wurde.
//Wurzel
// |-- einzeln: int (nullwertfähig = wahr)
// |-- double: int (nullwertfähig = wahr)
// |-- triple: int (nullwertfähig = wahr)
// |-- Schlüssel: int (nullwertfähig = wahr)

Hive-Datenquelle

Apache Hive ist die SQL-Engine für Hadoop und Spark SQL kann mit oder ohne Hive-Unterstützung kompiliert werden. Spark SQL mit Hive-Unterstützung kann Hive-Tabellenzugriff, UDF (benutzerdefinierte Funktion) und Hive-Abfragesprache (HiveQL/HQL) usw. unterstützen. Ein Punkt, der betont werden muss, ist, dass Sie Hive nicht im Voraus installieren müssen, wenn Sie Hive-Bibliotheken in Spark SQL einbinden möchten. Generell ist es am besten, Spark SQL mit Hive-Unterstützung zu kompilieren, damit Sie diese Funktionen nutzen können. Wenn Sie eine Binärversion von Spark heruntergeladen haben, sollte diese mit hinzugefügter Hive-Unterstützung kompiliert worden sein.

Um Spark SQL mit einem bereitgestellten Hive zu verbinden, müssen Sie hive-site.xml in das Spark-Konfigurationsdateiverzeichnis ($SPARK_HOME/conf) kopieren. Spark SQL kann ausgeführt werden, auch wenn Hive nicht bereitgestellt ist.

Beachten Sie, dass Spark SQL, wenn Sie Hive nicht bereitgestellt haben, sein eigenes Hive-Metadaten-Warehouse mit dem Namen metastore_db im aktuellen Arbeitsverzeichnis erstellt. Wenn Sie außerdem versuchen, Tabellen mit der Anweisung CREATE TABLE (nicht CREATE EXTERNAL TABLE) in HiveQL zu erstellen, werden die Tabellen im Verzeichnis /user/hive/warehouse in Ihrem Standarddateisystem abgelegt (HDFS, wenn Sie eine konfigurierte hdfs-site.xml in Ihrem Klassenpfad haben, andernfalls ist es das lokale Dateisystem).

importiere java.io.File
importiere org.apache.spark.sql.Row
importiere org.apache.spark.sql.SparkSession
Fallklasse Record (Schlüssel: Int, Wert: String)
// warehouseLocation verweist auf den Standardspeicherort für verwaltete Datenbanken und Tabellen
val warehouseLocation = neue Datei("spark-warehouse").getAbsolutePath
val spark = SparkSession
.Erbauer()
.appName("Spark Hive-Beispiel")
.config("spark.sql.warehouse.dir", Lagerort)
.enableHiveSupport()
.getOrCreate()
importiere spark.implicits._
spark.sql importieren
sql("TABELLE ERSTELLEN, WENN NICHT VORHANDEN src (Schlüssel INT, Wert STRING)")
sql("DATEN LOKALEN INPATH 'examples/src/main/resources/kv1.txt' IN TABELLE src laden")
// Abfragen werden in HiveQL ausgedrückt
sql("SELECT * FROM src").anzeigen()
// +---+--------+
// |Schlüssel| Wert|
// +---+--------+
//|238|val_238|
//| 86| Wert_86|
//|311|val_311|
// ...
// Aggregationsabfragen werden ebenfalls unterstützt.
sql("SELECT COUNT(*) FROM src").anzeigen()
// +--------+
// |Anzahl(1)|
// +--------+
// | 500 |
// +--------+
// Die Ergebnisse von SQL-Abfragen sind selbst DataFrames und unterstützen alle normalen Funktionen.
val sqlDF = sql("SELECT-Schlüssel, Wert FROM src WHERE-Schlüssel < 10 ORDER BY-Schlüssel")
// Die Elemente in DataFrames sind vom Typ „Row“, wodurch Sie auf jede Spalte per Ordinalzahl zugreifen können.
val stringsDS = sqlDF.map {
Fall Zeile (Schlüssel: Int, Wert: String) => s"Schlüssel: $Schlüssel, Wert: $Wert"
}
stringsDS.show()
// +--------------------+
// | Wert|
// +--------------------+
// |Schlüssel: 0, Wert: val_0|
// |Schlüssel: 0, Wert: val_0|
// |Schlüssel: 0, Wert: val_0|
// ...
// Sie können DataFrames auch verwenden, um temporäre Ansichten innerhalb einer SparkSession zu erstellen.
val recordsDF = spark.createDataFrame((1 bis 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("Aufzeichnungen")
// Abfragen können dann DataFrame-Daten mit in Hive gespeicherten Daten verknüpfen.
sql("SELECT * FROM Datensätze r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |Schlüssel| Wert|Schlüssel| Wert|
// +---+------+---+------+
// | 2| Wert_2| 2| Wert_2|
// | 4| Wert_4| 4| Wert_4|
// | 5| Wert_5| 5| Wert_5|
// ...

Eingebettete Hive-Anwendung

Wenn Sie den integrierten Hive verwenden möchten, müssen Sie nichts tun, sondern ihn einfach direkt verwenden. –conf:

spark.sql.warehouse.dir=

Hinweis: Wenn Sie den internen Hive verwenden, wird nach Spark 2.0 spark.sql.warehouse.dir verwendet, um die Adresse des Data Warehouse anzugeben. Wenn Sie HDFS als Pfad verwenden müssen, müssen Sie core-site.xml und hdfs-site.xml zum Spark-Conf-Verzeichnis hinzufügen. Andernfalls wird nur das Warehouse-Verzeichnis auf dem Masterknoten erstellt und während der Abfrage tritt das Problem auf, dass die Datei nicht gefunden wird. In diesem Fall müssen Sie HDFS verwenden, den Metastore löschen und den Cluster neu starten.

Externe Hive-Anwendung

Wenn Sie eine Verbindung zu einem extern bereitgestellten Hive herstellen möchten, müssen Sie die folgenden Schritte ausführen.

a) Kopieren Sie die Datei hive-site.xml in Hive oder führen Sie einen Softlink dazu durch in das Verzeichnis conf unter dem Spark-Installationsverzeichnis.

b Öffnen Sie die Spark-Shell und bringen Sie den JDBC-Client mit, um auf die Hive-Metabasis zuzugreifen.

$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar

JSON-Datensatz

Spark SQL kann die Struktur eines JSON-Datensatzes automatisch ableiten und ihn als Dataset[Row] laden. Sie können SparkSession.read.json() verwenden, um ein Dataset[String] oder eine JSON-Datei zu laden. Beachten Sie, dass diese JSON-Datei keine herkömmliche JSON-Datei ist. Jede Zeile muss eine JSON-Zeichenfolge sein.

{"name":"Michael"}
{"Name":"Andy", "Alter":30}
{"Name":"Justin", "Alter":19}
// Primitive Typen (Int, String, etc.) und Produkttypen (Case-Klassen) Encoder sind
// wird unterstützt, indem dies beim Erstellen eines Datensatzes importiert wird.
importiere spark.implicits._
// Der Pfad zeigt auf einen JSON-Datensatz.
// Der Pfad kann entweder eine einzelne Textdatei oder ein Verzeichnis sein, in dem Textdateien gespeichert sind
val Pfad = "Beispiele/Quelle/Haupt/Ressourcen/Personen.json"
val peopleDF = spark.read.json(Pfad)
// Das abgeleitete Schema kann mit der Methode printSchema() visualisiert werden
peopleDF.printSchema()
//Wurzel
// |-- Alter: lang (nullwertig = wahr)
// |-- Name: Zeichenfolge (nullwertfähig = wahr)
// Erstellt eine temporäre Ansicht unter Verwendung des DataFrame
peopleDF.createOrReplaceTempView("Menschen")
// SQL-Anweisungen können mithilfe der von Spark bereitgestellten SQL-Methoden ausgeführt werden
val teenagerNamesDF = spark.sql("WÄHLEN SIE NAMEN AUS PERSONEN MIT EINEM ALTER ZWISCHEN 13 UND 19")
teenagerNamesDF.show()
// +------+
// | Name|
// +------+
// |Justin|
// +------+
// Alternativ kann ein DataFrame für einen JSON-Datensatz erstellt werden, der dargestellt wird durch
// ein Dataset[String], das ein JSON-Objekt pro String speichert
Wert otherPeopleDataset = spark.createDataset(
"""{"Name":"Yin","Adresse":{"Stadt":"Columbus","Staat":"Ohio"}}""" :: Nil)
val anderePersonen = spark.read.json(anderePersonenDataset)
anderePersonen.show()
// +---------------+----+
// | Adresse|Name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

JDBC

Spark SQL kann DataFrames erstellen, indem es Daten aus einer relationalen Datenbank über JDBC liest. Nach einer Reihe von Berechnungen im DataFrame können die Daten in die relationale Datenbank zurückgeschrieben werden.

Beachten Sie, dass Sie den relevanten Datenbanktreiber in den Spark-Klassenpfad einfügen müssen.

$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar
// Hinweis: Das Laden und Speichern von JDBC kann entweder über die Methoden load/save oder jdbc erfolgen.
// Daten aus einer JDBC-Quelle laden
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/rdd").option("dbtable", " rddtable").option("Benutzer", "root").option("Passwort", "hive").load()
val connectionProperties = neue Eigenschaften()
connectionProperties.put("Benutzer", "root")
connectionProperties.put("Passwort", "Hive")
val jdbcDF2 = spark.read
.jdbc("jdbc:mysql://hadoop001:3306/rdd", "rddtable", Verbindungseigenschaften)
// Daten in einer JDBC-Quelle speichern
jdbcDF.schreiben
.format("jdbc")
.option("url", "jdbc:mysql://hadoop001:3306/rdd")
.option("dbtable", "rddtable2")
.option("Benutzer", "root")
.option("Passwort", "Hive")
.speichern()
jdbcDF2.schreiben
.jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", Verbindungseigenschaften)
// Datentypen für Tabellenspalten beim Schreiben festlegen
jdbcDF.schreiben
.option("createTableColumnTypes", "Name CHAR(64), Kommentare VARCHAR(1024)")
.jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", Verbindungseigenschaften)

Das Obige ist der vollständige Inhalt dieses Artikels. Ich hoffe, er wird für jedermanns Studium hilfreich sein. Ich hoffe auch, dass jeder 123WORDPRESS.COM unterstützen wird.

Das könnte Sie auch interessieren:
  • Spark SQL 2.4.8 Zwei Möglichkeiten zum Bedienen von Dataframe
  • Detaillierter Prozess zum Erstellen von SparkSession und sparkSQL
  • IDEA-Entwicklung und Konfiguration von SparkSQL und einfachem Use-Case-Code
  • Analyse der gesamten Implementierungslogik von Spark SQL
  • Implementierung von Pyspark zum Lesen und Schreiben einer MySQL-Datenbank
  • SparkSQL liest Hive-Daten und führt die lokale Idee detailliert aus
  • Apache Hudi integriert Spark SQL zum Bedienen von Hide-Tabellen

<<:  Grundlegende Implementierung der AOP-Programmierung in JavaScript

>>:  So installieren Sie Nginx in CentOS

Artikel empfehlen

Tutorial zur Installation von MySQL 5.7.28 auf CentOS 6.2 (MySQL-Hinweise)

1. Umweltvorbereitung 1.MySQL-Installationspfad: ...

Implementierung der Einzelprozesssteuerung des Linux C-Hintergrunddienstprogramms

einführen Normalerweise muss ein Hintergrundserve...

Vue implementiert die Anzeige und Ausblendung der dreistufigen Navigation

In diesem Artikelbeispiel wird der spezifische Co...

Mysql-Datenbankdesign - Analyse von drei Paradigmenbeispielen

Drei Paradigmen 1NF: Felder sind untrennbar; 2NF:...

Die „3I“-Standards für erfolgreiche Printwerbung

Für viele inländische Werbetreibende ist die Erste...

So öffnen Sie eine Seite in einem Iframe

Lösung: Setzen Sie den Zielattributwert des Links ...

Detailliertes Beispiel zum Ändern des Linux-Kontokennworts

Passwort des persönlichen Kontos ändern Wenn norm...

Ein kurzer Vortrag über den MySQL-Index und die Redis-Sprungtabelle

Zusammenfassung Bei der Diskussion über MySQL-Ind...

Hallo – Erfahrungsaustausch zum Entwurf von Dialogfeldern

„Was ist los?“ Sofern Sie nicht an bestimmte Arten...

So starten Sie mehrere MySQL-Datenbanken auf einem Linux-Host

Lassen Sie uns heute darüber sprechen, wie Sie vi...

display:grid in CSS3, eine Einführung in das Rasterlayout

1. Rasterlayout (Raster): Es unterteilt die Webse...

Grafisches Tutorial zur Installation und Konfiguration von MySQL 8.0.17

In diesem Artikel wird die Installations- und Kon...