JDBC to other databases - Spark 3.3.1 documentation (2023)

Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred to usageJdbcRDD. This is because the results are returned as a DataFrame and can be easily processed in Spark SQL or joined to other data sources. The JDBC data source is also easier to use from Java or Python as the user does not need to provide a ClassTag. (Note that this is different than the Spark SQL JDBC server, which allows other applications to run queries using Spark SQL).

First you need to add the JDBC driver for your specific database to the Spark classpath. For example, to connect to Postgres from the Spark shell, you would run the following command:

Spark supports the following case-insensitive options for JDBC. JDBC data source options can be set via:

For connection properties, users can specify the JDBC connection properties in the data source options.userandpasswordare typically provided as connection properties for logging into the data sources.

Name of the propertyStandardmeaningscope URL (none) The JDBC URL of the formjdbc:subprotocol:subnameconnect to. The source-specific connection properties can be specified in the URL. e.g.,jdbc:postgresql://localhost/test?user=fred&password=secret Read Write dbtable (none) The JDBC table to read from or write to. Note that when used in the read path, anything in a is validOUT- clause of a SQL query to be used. For example, instead of a full table, you could use a subquery in parentheses. An entry is not alloweddbtableandinquiryoptions at the same time. Read Write inquiry (none) A query used to read data into Spark. The specified query is enclosed in parentheses and used as a subquery in theOUTClause. Spark also assigns an alias to the subquery clause. As an example, Spark sends a query of the following form to the JDBC source.

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

Below are some limitations when using this option.

  1. An entry is not alloweddbtableandinquiryoptions at the same time.
  2. An entry is not allowedinquiryandpartition columnoptions at the same time. When specifyingpartition columnoption is required, the subquery can be specified withdbtableOption instead and partition columns can be qualified with the subquery alias provided as part ofdbtable.
    Example:
    spark.read.format("jdbc")
    .option("URL", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .Burden()
Read Write driver (none) The class name of the JDBC driver to use to connect to this URL. Read Write partitioncolumn, lower bound, upper bound (none) These options must all be specified if any of them are specified. Additionally,numPartitionenmust be specified. They describe how the table is partitioned when multiple workers read it in parallel.partition columnmust be a numeric, date, or timestamp column from the table in question. note thatlower limitandupper limitare only used to determine the partition step, not to filter the rows in the table. Therefore, all rows in the table are partitioned and returned. This option is read only. read numPartitionen (none) The maximum number of partitions that can be used for concurrency when reading and writing tables. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we call it down to this limitcoalesce(numPartitions)before writing. Read Write queryTimeout 0 The number of seconds the driver waits before executing a Statement object up to the specified number of seconds. Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the APIsetQueryTimeout, e.g. For example, the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. Read Write get size 0 The JDBC fetch size, which determines how many rows are fetched per round trip. This can improve the performance of JDBC drivers that have a low fetch size by default (e.g. Oracle with 10 rows). read batch size 1000 The JDBC batch size that determines how many rows are inserted per round trip. This can improve the performance of JDBC drivers. This option only applies to writing. write Isolationsstufe READ_UNCOMMITTED The transaction isolation level in effect for the current connection. It can beNONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ, orSERIALIZABLE, corresponding to the default transaction isolation levels defined by JDBC's Connection object, with a default value ofREAD_UNCOMMITTED. Please refer to the documentation injava.sql.Connection. write sessionInitStatement (none) After each database session is opened to the remote DB and before reading any data, this option executes a custom SQL statement (or PL/SQL block). Use this to implement session initialization code. Example:option("sessionInitStatement", """Execute BEGIN immediately 'Change session set "_serial_direct_read"=true'; END;""") read shorten NOT CORRECT This is a JDBC Writer related option. WhenSaveMode.Overwriteenabled, this option causes Spark to truncate an existing table instead of deleting and recreating it. This can be more efficient and avoids removing the table metadata (e.g. indexes). However, in some cases it does not work, e.g. B. if the new data has a different schema. In the event of a failure, users should switch offshortenoption to useDROP TABLEagain. Also due to the different behavior ofTRIM TABLEUnder DBMS it is not always safe to use this. MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect and OracleDialect support it, while PostgresDialect and standard JDBCDirect do not. For unknown and unsupported JDBCDirect, the user optionshortenis ignored.write cascadeTruncate the default cascading truncation behavior of the JDBC database in question, specified in theisCascadeTruncatein any JDBCD dialect This is a JDBC Writer related option. If enabled and supported by the JDBC database (currently PostgreSQL and Oracle), this option allows running aREDUCTION OF TABLE t CASCADE(im Fall von PostgreSQL aNUR TABLE TRUNCATE t CASCADEexecuted to prevent accidental truncation of descendant tables). This affects other tables and should therefore be used with caution. write createTableOptions This is a JDBC Writer related option. If specified, this option allows setting database-specific table and partition options when creating a table (e.g.CREATE TABLE t (name string) ENGINE=InnoDB.). write createTableColumnTypes (none) The data types of the database columns to use instead of the default values ​​when creating the table. Data type information should be specified in the same format as the CREATE TABLE column syntax (e.g.:"Name CHAR(64), Comments VARCHAR(1024)"). The types specified should be valid Spark SQL data types. write custom scheme (none) The custom schema to use to read data from JDBC connectors. For example,"ID DECIMAL(38, 0), Name STRING". You can also specify subfields, and the others use the default type mapping. For example,"id DEZIMAL(38, 0)". The column names should be the same as the corresponding column names in the JDBC table. Users can specify Spark SQL's appropriate data types instead of using the default values. read pushDownPredicate Is correct The option to enable or disable predicate pushdown into the JDBC data source. The default value is "true". In this case, Spark Filter pushes as much as possible to the JDBC data source. Otherwise, if set to false, no filter is pushed to the JDBC data source and thus all filters are handled by Spark. Predicate pushdown is typically disabled when Spark performs predicate filtering faster than the JDBC data source. read pushDownAggregate NOT CORRECT The option to enable or disable aggregated pushdown in the V2 JDBC data source. The default is false. In this case, Spark does not transfer aggregates to the JDBC data source. Otherwise, if set to true, aggregates are transferred to the JDBC data source. Aggregate pushdown is typically disabled when the aggregate runs faster from Spark than from the JDBC data source. Please note that aggregates can only be pushed down if all aggregate functions and their associated filters can be pushed down. ifnumPartitionenequals 1 or the group-by key is the same aspartition column, Spark fully transfers the aggregate to the data source and does not apply a final aggregate to the data source output. Otherwise, Spark applies a final aggregate to the data source output. read pushDownLimit NOT CORRECT The option to enable or disable LIMIT pushdown into the V2 JDBC data source. The LIMIT pushdown also includes LIMIT + SORT , also known as the top-N operator. The default is false. In this case, Spark does not SORT LIMIT or LIMIT to the JDBC data source. Otherwise, if set to true, SORT moves LIMIT or LIMIT down into the JDBC data source. ifnumPartitionenis greater than 1, SPARK still applies LIMIT or LIMIT with SORT to the result from the data source even if LIMIT or LIMIT with SORT is pressed. Otherwise, if LIMIT or LIMIT is pressed with SORT andnumPartitionenis equal to 1, SPARK does not apply LIMIT or LIMIT with SORT to the result from the data source. read pushDownTableSample NOT CORRECT The option to enable or disable TABLESAMPLE pushdown to the V2 JDBC data source. The default is false. In this case, Spark does not transfer TABLESAMPLE to the JDBC data source. Otherwise, if the value is set to true, the TABLESAMPLE is transferred to the JDBC data source. read Kaufen (none) Location of the Kerberos keytab file (which must be pre-uploaded to all nodes, either via--filesSpark submit or manual option) for the JDBC client. If path information is found, Spark considers the keytab to be manually distributed, otherwise--filessupposed. If bothKaufenandHeadmasterare defined, Spark attempts to perform Kerberos authentication. Read Write Headmaster (none) Specifies the Kerberos principal name for the JDBC client. If bothKaufenandHeadmasterare defined, Spark attempts to perform Kerberos authentication. Read Write refreshKrb5Config NOT CORRECT This option controls whether or not to update the Kerberos configuration for the JDBC client before establishing a new connection. Set to true if you want to update the configuration, otherwise set to false. The default is false. Note that if you set this option to true and try to make multiple connections, a race condition may occur. A possible situation would be as follows.
  1. The refreshKrb5Config flag is set with security context 1
  2. A JDBC Connection Provider is used for the corresponding DBMS
  3. The krb5.conf is modified, but the JVM has not yet recognized that it needs to be reloaded
  4. Spark successfully authenticates to security context 1
  5. The JVM loads security context 2 from the modified krb5.conf
  6. Spark restores the previously saved security context 1
  7. The modified krb5.conf content is just gone
Read Write connection provider (none) The name of the JDBC connection provider to use to connect to this URL, e.g.db2,mssql. Must be one of the providers loaded with the JDBC data source. Used to make unique when more than one provider can handle the specified driver and options. The selected provider must not be deactivated byspark.sql.sources.disabledJdbcConnProviderList. Read Write
(Video) 3. Installing spark

Note that Kerberos authentication with keytab is not always supported by the JDBC driver.
Before useKaufenandHeadmasterConfiguration options please ensure that the following requirements are met:

If the requirements are not met, please use theJdbcConnectionProviderDeveloper API to handle custom authentication.

(Video) Deep Dive into the New Features of Apache Spark 3.2 and 3.3

// Note: JDBC loading and saving can be done using either the load/save or jdbc methods// Load data from a JDBC sourceWert jdbcDF = Funke.read .Format("jdbc") .possibility("url", "jdbc:postgresql:dbserver") .possibility("dbtable", "schema.tablennaname") .possibility("User", "username") .possibility("Password", "Password") .burden()Wert connection properties = Neu Characteristics()connection properties.place("User", "username")connection properties.place("Password", "Password")Wert jdbcDF2 = Funke.read .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties)// Specify the read schema's user-defined data typesconnection properties.place("custom scheme", "ID DECIMAL(38, 0), Name STRING")Wert jdbcDF3 = Funke.read .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties)// Store data in a JDBC sourcejdbcDF.write .Format("jdbc") .possibility("url", "jdbc:postgresql:dbserver") .possibility("dbtable", "schema.tablennaname") .possibility("User", "username") .possibility("Password", "Password") .save up()jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties)// Set data types to create table columns on writejdbcDF.write .possibility("createTableColumnTypes", "Name CHAR(64), Comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties)

For the complete sample code, see examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala in the Spark repository.

(Video) How to install Pyspark in win11

// Note: JDBC loading and saving can be done using either the load/save or jdbc methods// Load data from a JDBC sourcerecord<The line> jdbcDF = Funke.read() .Format("jdbc") .possibility("url", "jdbc:postgresql:dbserver") .possibility("dbtable", "schema.tablennaname") .possibility("User", "username") .possibility("Password", "Password") .burden();Characteristics connection properties = Neu Characteristics();connection properties.place("User", "username");connection properties.place("Password", "Password");record<The line> jdbcDF2 = Funke.read() .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties);// Store data in a JDBC sourcejdbcDF.write() .Format("jdbc") .possibility("url", "jdbc:postgresql:dbserver") .possibility("dbtable", "schema.tablennaname") .possibility("User", "username") .possibility("Password", "Password") .save up();jdbcDF2.write() .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties);// Set data types to create table columns on writejdbcDF.write() .possibility("createTableColumnTypes", "Name CHAR(64), Comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", connection properties);

For the complete sample code, see examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java in the Spark repository.

# Note: JDBC loading and saving can be achieved using either the load/save or jdbc method# Loading data from a JDBC sourcejdbcDF = Funke.read\.Format("jdbc")\.possibility("url", "jdbc:postgresql:dbserver")\.possibility("dbtable", "schema.tablennaname")\.possibility("User", "username")\.possibility("Password", "Password")\.burden()jdbcDF2 = Funke.read\.jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", Characteristics={"User": "username", "Password": "Password"})# Specification of the data types of the data frame column when readingjdbcDF3 = Funke.read\.Format("jdbc")\.possibility("url", "jdbc:postgresql:dbserver")\.possibility("dbtable", "schema.tablennaname")\.possibility("User", "username")\.possibility("Password", "Password")\.possibility("custom scheme", "ID DECIMAL(38, 0), Name STRING")\.burden()# Store data in a JDBC sourcejdbcDF.write\.Format("jdbc")\.possibility("url", "jdbc:postgresql:dbserver")\.possibility("dbtable", "schema.tablennaname")\.possibility("User", "username")\.possibility("Password", "Password")\.save up()jdbcDF2.write\.jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", Characteristics={"User": "username", "Password": "Password"})# Set datatypes to create table columns on writejdbcDF.write\.possibility("createTableColumnTypes", "Name CHAR(64), Comments VARCHAR(1024)")\.jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", Characteristics={"User": "username", "Password": "Password"})

For the complete sample code, see examples/src/main/python/sql/datasource.py in the Spark repository.

(Video) InterSystems IRIS from Spark to Finish
# Load data from a JDBC sourcedf <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablennaname", user = "username", password = "Password")# Store data in a JDBC sourcewrite.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablennaname", user = "username", password = "Password")
(Video) Apache Spark SQL - Spark Using SQL - Apache Spark Tutorial - Spark OnlineLearningCenter

For the complete example code, see examples/src/main/r/RSparkSQLExample.R in the Spark repository.

CREATE IN THE INTERIM VIEW jdbcTableUSE org.Apache.Funke.sql.jdbcOPTIONS ( URL "jdbc:postgresql:dbserver", dbtable "schema.tablennaname", user 'username', password 'Password')INSERTION IN TISCH jdbcTableCHOOSE * OUT resultTable

Videos

1. Install Apache Spark on Windows 10 | Steps to Setup Spark
(Azarudeen Shahul)
2. Latest Hadoop 3.2.2 Spark 3.1.2 installation in windows | Spark Training In Hyderabad
(Sreyobhilashi IT)
3. Apache Spark - Install Apache Spark On Windows 10 |Spark Tutorial | Part 1
(Big Tech Talk)
4. Apache Spark Installation on CentOS 7
(Indeed Inspiring Infotech)
5. Install Scala And Spark On Ubuntu | Spark Installation | Spark Tutorial | OnlineLearningCenter
(OnlineLearningCenter)
6. Hive installation in తెలుగు.
(BigDataQuest)

References

Top Articles
Latest Posts
Article information

Author: Fr. Dewey Fisher

Last Updated: 08/14/2023

Views: 6587

Rating: 4.1 / 5 (62 voted)

Reviews: 85% of readers found this page helpful

Author information

Name: Fr. Dewey Fisher

Birthday: 1993-03-26

Address: 917 Hyun Views, Rogahnmouth, KY 91013-8827

Phone: +5938540192553

Job: Administration Developer

Hobby: Embroidery, Horseback riding, Juggling, Urban exploration, Skiing, Cycling, Handball

Introduction: My name is Fr. Dewey Fisher, I am a powerful, open, faithful, combative, spotless, faithful, fair person who loves writing and wants to share my knowledge and understanding with you.