Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred to usageJdbcRDD. This is because the results are returned as a DataFrame and can be easily processed in Spark SQL or joined to other data sources. The JDBC data source is also easier to use from Java or Python as the user does not need to provide a ClassTag. (Note that this is different than the Spark SQL JDBC server, which allows other applications to run queries using Spark SQL).
First you need to add the JDBC driver for your specific database to the Spark classpath. For example, to connect to Postgres from the Spark shell, you would run the following command:
Spark supports the following case-insensitive options for JDBC. JDBC data source options can be set via:
For connection properties, users can specify the JDBC connection properties in the data source options.user
andpassword
are typically provided as connection properties for logging into the data sources.
URL
jdbc:subprotocol:subname
connect to. The source-specific connection properties can be specified in the URL. e.g.,jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable
OUT
- clause of a SQL query to be used. For example, instead of a full table, you could use a subquery in parentheses. An entry is not alloweddbtable
andinquiry
options at the same time.inquiry
OUT
Clause. Spark also assigns an alias to the subquery clause. As an example, Spark sends a query of the following form to the JDBC source. SELECT <columns> FROM (<user_specified_query>) spark_gen_alias
Below are some limitations when using this option.
- An entry is not allowed
dbtable
andinquiry
options at the same time. - An entry is not allowed
inquiry
andpartition column
options at the same time. When specifyingpartition column
option is required, the subquery can be specified withdbtable
Option instead and partition columns can be qualified with the subquery alias provided as part ofdbtable
.
Example:
spark.read.format("jdbc")
.option("URL", jdbcUrl)
.option("query", "select c1, c2 from t1")
.Burden()
driver
partitioncolumn, lower bound, upper bound
numPartitionen
must be specified. They describe how the table is partitioned when multiple workers read it in parallel.partition column
must be a numeric, date, or timestamp column from the table in question. note thatlower limit
andupper limit
are only used to determine the partition step, not to filter the rows in the table. Therefore, all rows in the table are partitioned and returned. This option is read only.numPartitionen
coalesce(numPartitions)
before writing.queryTimeout
0
setQueryTimeout
, e.g. For example, the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch.get size
0
batch size
1000
Isolationsstufe
READ_UNCOMMITTED
NONE
,READ_COMMITTED
,READ_UNCOMMITTED
,REPEATABLE_READ
, orSERIALIZABLE
, corresponding to the default transaction isolation levels defined by JDBC's Connection object, with a default value ofREAD_UNCOMMITTED
. Please refer to the documentation injava.sql.Connection
.sessionInitStatement
option("sessionInitStatement", """Execute BEGIN immediately 'Change session set "_serial_direct_read"=true'; END;""")
shorten
NOT CORRECT
SaveMode.Overwrite
enabled, this option causes Spark to truncate an existing table instead of deleting and recreating it. This can be more efficient and avoids removing the table metadata (e.g. indexes). However, in some cases it does not work, e.g. B. if the new data has a different schema. In the event of a failure, users should switch offshorten
option to useDROP TABLE
again. Also due to the different behavior ofTRIM TABLE
Under DBMS it is not always safe to use this. MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect and OracleDialect support it, while PostgresDialect and standard JDBCDirect do not. For unknown and unsupported JDBCDirect, the user optionshorten
is ignored.cascadeTruncate
isCascadeTruncate
in any JDBCD dialectREDUCTION OF TABLE t CASCADE
(im Fall von PostgreSQL aNUR TABLE TRUNCATE t CASCADE
executed to prevent accidental truncation of descendant tables). This affects other tables and should therefore be used with caution.createTableOptions
CREATE TABLE t (name string) ENGINE=InnoDB.
).createTableColumnTypes
"Name CHAR(64), Comments VARCHAR(1024)")
. The types specified should be valid Spark SQL data types.custom scheme
"ID DECIMAL(38, 0), Name STRING"
. You can also specify subfields, and the others use the default type mapping. For example,"id DEZIMAL(38, 0)"
. The column names should be the same as the corresponding column names in the JDBC table. Users can specify Spark SQL's appropriate data types instead of using the default values.pushDownPredicate
Is correct
pushDownAggregate
NOT CORRECT
numPartitionen
equals 1 or the group-by key is the same aspartition column
, Spark fully transfers the aggregate to the data source and does not apply a final aggregate to the data source output. Otherwise, Spark applies a final aggregate to the data source output.pushDownLimit
NOT CORRECT
numPartitionen
is greater than 1, SPARK still applies LIMIT or LIMIT with SORT to the result from the data source even if LIMIT or LIMIT with SORT is pressed. Otherwise, if LIMIT or LIMIT is pressed with SORT andnumPartitionen
is equal to 1, SPARK does not apply LIMIT or LIMIT with SORT to the result from the data source.pushDownTableSample
NOT CORRECT
Kaufen
--files
Spark submit or manual option) for the JDBC client. If path information is found, Spark considers the keytab to be manually distributed, otherwise--files
supposed. If bothKaufen
andHeadmaster
are defined, Spark attempts to perform Kerberos authentication.Headmaster
Kaufen
andHeadmaster
are defined, Spark attempts to perform Kerberos authentication.refreshKrb5Config
NOT CORRECT
- The refreshKrb5Config flag is set with security context 1
- A JDBC Connection Provider is used for the corresponding DBMS
- The krb5.conf is modified, but the JVM has not yet recognized that it needs to be reloaded
- Spark successfully authenticates to security context 1
- The JVM loads security context 2 from the modified krb5.conf
- Spark restores the previously saved security context 1
- The modified krb5.conf content is just gone
connection provider
db2
,mssql
. Must be one of the providers loaded with the JDBC data source. Used to make unique when more than one provider can handle the specified driver and options. The selected provider must not be deactivated byspark.sql.sources.disabledJdbcConnProviderList
.Note that Kerberos authentication with keytab is not always supported by the JDBC driver.
Before useKaufen
andHeadmaster
Configuration options please ensure that the following requirements are met:
If the requirements are not met, please use theJdbcConnectionProvider
Developer API to handle custom authentication.
// Note: JDBC loading and saving can be done using either the load/save or jdbc methods// Load data from a JDBC sourceWert jdbcDF = Funke.read .Format("jdbc") .possibility("url", "jdbc:postgresql:dbserver") .possibility("dbtable", "schema.tablennaname") .possibility("User", "username") .possibility("Password", "Password") .burden()Wert connection properties = Neu Characteristics()connection properties.place("User", "username")connection properties.place("Password", "Password")Wert jdbcDF2 = Funke.read .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties)// Specify the read schema's user-defined data typesconnection properties.place("custom scheme", "ID DECIMAL(38, 0), Name STRING")Wert jdbcDF3 = Funke.read .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties)// Store data in a JDBC sourcejdbcDF.write .Format("jdbc") .possibility("url", "jdbc:postgresql:dbserver") .possibility("dbtable", "schema.tablennaname") .possibility("User", "username") .possibility("Password", "Password") .save up()jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties)// Set data types to create table columns on writejdbcDF.write .possibility("createTableColumnTypes", "Name CHAR(64), Comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties)
For the complete sample code, see examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala in the Spark repository.
// Note: JDBC loading and saving can be done using either the load/save or jdbc methods// Load data from a JDBC sourcerecord<The line> jdbcDF = Funke.read() .Format("jdbc") .possibility("url", "jdbc:postgresql:dbserver") .possibility("dbtable", "schema.tablennaname") .possibility("User", "username") .possibility("Password", "Password") .burden();Characteristics connection properties = Neu Characteristics();connection properties.place("User", "username");connection properties.place("Password", "Password");record<The line> jdbcDF2 = Funke.read() .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties);// Store data in a JDBC sourcejdbcDF.write() .Format("jdbc") .possibility("url", "jdbc:postgresql:dbserver") .possibility("dbtable", "schema.tablennaname") .possibility("User", "username") .possibility("Password", "Password") .save up();jdbcDF2.write() .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties);// Set data types to create table columns on writejdbcDF.write() .possibility("createTableColumnTypes", "Name CHAR(64), Comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties);
For the complete sample code, see examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java in the Spark repository.
# Note: JDBC loading and saving can be achieved using either the load/save or jdbc method# Loading data from a JDBC sourcejdbcDF = Funke.read\.Format("jdbc")\.possibility("url", "jdbc:postgresql:dbserver")\.possibility("dbtable", "schema.tablennaname")\.possibility("User", "username")\.possibility("Password", "Password")\.burden()jdbcDF2 = Funke.read\.jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", Characteristics={"User": "username", "Password": "Password"})# Specification of the data types of the data frame column when readingjdbcDF3 = Funke.read\.Format("jdbc")\.possibility("url", "jdbc:postgresql:dbserver")\.possibility("dbtable", "schema.tablennaname")\.possibility("User", "username")\.possibility("Password", "Password")\.possibility("custom scheme", "ID DECIMAL(38, 0), Name STRING")\.burden()# Store data in a JDBC sourcejdbcDF.write\.Format("jdbc")\.possibility("url", "jdbc:postgresql:dbserver")\.possibility("dbtable", "schema.tablennaname")\.possibility("User", "username")\.possibility("Password", "Password")\.save up()jdbcDF2.write\.jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", Characteristics={"User": "username", "Password": "Password"})# Set datatypes to create table columns on writejdbcDF.write\.possibility("createTableColumnTypes", "Name CHAR(64), Comments VARCHAR(1024)")\.jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", Characteristics={"User": "username", "Password": "Password"})
For the complete sample code, see examples/src/main/python/sql/datasource.py in the Spark repository.
# Load data from a JDBC sourcedf <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", user = "username", password = "Password")# Store data in a JDBC sourcewrite.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablennaname", user = "username", password = "Password")
For the complete example code, see examples/src/main/r/RSparkSQLExample.R in the Spark repository.
CREATE IN THE INTERIM VIEW jdbcTableUSE org.Apache.Funke.sql.jdbcOPTIONS ( URL "jdbc:postgresql:dbserver", dbtable "schema.tablennaname", user 'username', password 'Password')INSERTION IN TISCH jdbcTableCHOOSE * OUT resultTable