English | 简体中文
OceanBase Spark Connector fully supports Spark Catalog since version 1.1, which provides a more convenient and efficient solution for users to use OceanBase in Spark. By using Spark Catalog, users can access and operate OceanBase databases in a more concise and consistent way.
- Currently only supports OceanBase MySQL mode.
- Supports Spark adaptive partitioning and parallel reading of OceanBase (via JDBC).
- Predicate pushdown support.
- Supports writing OceanBase through direct load.
- Supports writing OceanBase through JDBC.
- For primary key tables, supports writing in upsert mode.
- MySQL mode is based on:
INSERT INTO ... ON DUPLICATE KEY UPDATEsyntax.
- MySQL mode is based on:
- For non-primary key tables, write through
INSERT INTO.
- For primary key tables, supports writing in upsert mode.
- Supports managing databases and tables in OceanBase through Spark-SQL, including: show databases, show tables, drop table, drop database and other syntax support.
- Supports CTAS syntax to create and write OceanBase tables.
| Connector | Spark | OceanBase | Java | Scala |
|---|---|---|---|---|
| 1.1 or later versions | 3.1 ~ 3.5 |
|
8 | 2.12 |
- Note: If you need a package built based on other Scala versions, you can get the package by building it from source code.
You can get the release packages at Releases Page or Maven Central.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>spark-connector-oceanbase-3.4_2.12</artifactId>
<version>${project.version}</version>
</dependency>If you'd rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>spark-connector-oceanbase-3.4_2.12</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>Of course, you can also get the package by building from source code.
- By default, it is built with scala version 2.12
- After successful compilation, the target jar package will be generated in the target directory under the module corresponding to each version, such as: spark-connector-oceanbase-3.4_2.12-1.0-SNAPSHOT.jar. Copy this file to Spark's ClassPath to use spark-connector-oceanbase.
git clone https://github.com/oceanbase/spark-connector-oceanbase.git
cd spark-connector-oceanbase
mvn clean package -DskipTests- If you need a package built based on other Scala versions, refer to the command below to build based on Scala 2.13.
git clone https://github.com/oceanbase/spark-connector-oceanbase.git
cd spark-connector-oceanbase
mvn clean package -Dscala.version=2.13.15 -Dscala.binary.version=2.13 -DskipTestsSet OceanBase Catalog related parameters when starting Spark-SQL CLI
./bin/spark-sql \
--conf "spark.sql.catalog.your_catalog_name=com.oceanbase.spark.catalog.OceanBaseCatalog" \
--conf "spark.sql.catalog.your_catalog_name.url=jdbc:mysql://localhost:2881" \
--conf "spark.sql.catalog.your_catalog_name.username=root@test" \
--conf "spark.sql.catalog.your_catalog_name.password=******" \
--conf "spark.sql.catalog.your_catalog_name.schema-name=test" \
--conf "spark.sql.defaultCatalog=your_catalog_name"Of course, you can also configure the relevant parameters in the spark configuration file, the default is spark-defaults.conf, as shown below, and then directly start the Spark-SQL CLI.
spark.sql.catalog.your_catalog_name=com.oceanbase.spark.catalog.OceanBaseCatalog
spark.sql.catalog.your_catalog_name.url=jdbc:mysql://localhost:2881
spark.sql.catalog.your_catalog_name.username=root@test
spark.sql.catalog.your_catalog_name.password=******
spark.sql.catalog.your_catalog_name.schema-name=test
spark.sql.defaultCatalog=your_catalog_nameFor DataFrame API, you can configure to use OceanBase Catalog in the following way:
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.catalog.your_catalog_name", "com.oceanbase.spark.catalog.OceanBaseCatalog")
.config("spark.sql.catalog.your_catalog_name.url", "jdbc:mysql://localhost:2881")
.config("spark.sql.catalog.your_catalog_name.username", "root@test")
.config("spark.sql.catalog.your_catalog_name.password", "******")
.config("spark.sql.catalog.your_catalog_name.schema-name", "test")
.config("spark.sql.defaultCatalog", "your_catalog_name")
.getOrCreate()-- show all databases
show databases;
-- drop database test
drop database test;
-- create database test
create database test;
-- use and switch databases
use your_oceanbase_db;
-- show tables in test
show tables;
-- create table test1
CREATE TABLE test.test1(
user_id BIGINT COMMENT 'test_for_key',
name VARCHAR(255)
)
PARTITIONED BY (bucket(16, user_id))
COMMENT 'test_for_table_create'
TBLPROPERTIES('replica_num' = 2, COMPRESSION = 'zstd_1.0');
-- drop table test1
drop table test.test1;
SELECT * FROM test.test;Take synchronizing data from Hive to OceanBase as an example
Create corresponding Hive tables and OceanBase tables to prepare for data synchronization
- Start spark-sql by running
${SPARK_HOME}/bin/spark-sql
CREATE TABLE spark_catalog.default.orders (
order_id INT,
order_date TIMESTAMP,
customer_name string,
price double,
product_id INT,
order_status BOOLEAN
) using parquet;
insert into spark_catalog.default.orders values
(1, now(), 'zs', 12.2, 12, true),
(2, now(), 'ls', 121.2, 12, true),
(3, now(), 'xx', 123.2, 12, true),
(4, now(), 'jac', 124.2, 12, false),
(5, now(), 'dot', 111.25, 12, true);- Connect to OceanBase
CREATE TABLE test.orders (
order_id INT PRIMARY KEY,
order_date TIMESTAMP,
customer_name VARCHAR(225),
price double,
product_id INT,
order_status BOOLEAN
);insert into table test.orders
select * from spark_catalog.default.orders;Add the following direct-load related parameters to the spark configuration file, which defaults to spark-defaults.conf, as shown below, and then restart the Spark-SQL CLI.
spark.sql.catalog.your_catalog_name=com.oceanbase.spark.catalog.OceanBaseCatalog
spark.sql.catalog.your_catalog_name.url=jdbc:mysql://localhost:2881
spark.sql.catalog.your_catalog_name.username=root@test
spark.sql.catalog.your_catalog_name.password=******
spark.sql.catalog.your_catalog_name.schema-name=test
spark.sql.defaultCatalog=your_catalog_name
# enable direct-load
spark.sql.catalog.your_catalog_name.direct-load.enabled=true;
spark.sql.catalog.your_catalog_name.direct-load.host=localhost;
spark.sql.catalog.your_catalog_name.direct-load.rpc-port=2882;insert into table test.orders
select * from spark_catalog.default.orders;Precautions for direct-load:
- Table locking will occur during the direct-load job. While the table is locked:
- Data write operations and DDL changes are prohibited.
- Data queries are allowed.
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| spark.sql.catalog.your_catalog_name | Yes | String | Sets the class name of the Catalog provider. For OceanBase, the only valid value is com.oceanbase.spark.catalog.OceanBaseCatalog. | |
| spark.sql.catalog.your_catalog_name.url | Yes | String | The JDBC url to connect to OceanBase. | |
| spark.sql.catalog.your_catalog_name.username | Yes | String | The username. | |
| spark.sql.catalog.your_catalog_name.password | Yes | String | The password. | |
| spark.sql.catalog.your_catalog_name.driver | No | String | The class name of the JDBC driver to use to connect to this URL. | |
| spark.sql.catalog.your_catalog_name.schema-name | No | String | Set the default schema for the OceanBase Catalog. | |
| spark.sql.catalog.your_catalog_name.jdbc.fetch-size | No | 100 | Int | The JDBC fetch size, which determines how many rows to fetch per round trip. |
| spark.sql.catalog.your_catalog_name.jdbc.batch-size | No | 1024 | Int | The JDBC writing batch size, which determines how many rows to insert per round trip. |
| spark.sql.catalog.your_catalog_name.jdbc.max-records-per-partition | No | Int | Controls the maximum number of data that can be used as a Spark partition when Spark reads OBs. The default is empty, and Spark will automatically calculate a reasonable value based on the amount of data. Note: It is generally not recommended to set this parameter. | |
| spark.sql.catalog.your_catalog_name.jdbc.parallel-hint-degree | No | 1 | Int | The SQL statements sent by Spark to OB will automatically carry PARALLEL Hint. This parameter can be used to adjust the parallelism, and the default value is 1. |
| spark.sql.catalog.your_catalog_name.jdbc.statistics-parallel-hint-degree | No | 4 | Int | Controls the parallelism level for statistical queries (e.g., COUNT, MIN, MAX) by adding /*+ PARALLEL(N) */ hint to generated SQL. |
| spark.sql.catalog.your_catalog_name.jdbc.partition-compute-parallelism | No | 32 | Int | Controls the parallelism level for partition computation. This parameter determines the number of threads used when computing partitions for partitioned tables (mainly through parallel SQL queries to OceanBase partition statistics). The computation task runs on the driver node. Higher values can improve performance for tables with many partitions. When setting a larger value for this parameter, appropriately increasing the CPU cores and memory of the driver node can achieve better performance. |
| spark.sql.catalog.your_catalog_name.jdbc.partition-compute-timeout-minutes | No | 10 | Int | Timeout in minutes for partition computation. This parameter controls how long to wait for partition computation to complete before throwing a timeout exception. |
| spark.sql.catalog.your_catalog_name.jdbc.query-timeout-hint-degree | No | -1 | Int | Control the query timeout by adding /*+ query_timeout(N) */ hint to the generated SQL. This parameter can be used to specify the timeout in microseconds. The default value is -1, which means that the hint is not added. |
| spark.sql.catalog.your_catalog_name.jdbc.query-hints | No | String | Additional OceanBase query hints added to SELECT query statements. Multiple hints can be specified separated by spaces, e.g. 'READ_CONSISTENCY(WEAK) query_timeout(10000000)'. | |
| spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition | No | false | Boolean | When true, primary key tables will be prohibited from using WHERE clause partitioning. |
| spark.sql.catalog.your_catalog_name.jdbc.{database}.{table}.partition-column | No | String | You can manually specify the primary key table partition column, and by default, one will be automatically selected from the primary key columns. | |
| spark.sql.catalog.your_catalog_name.jdbc.enable-autocommit | No | false | Boolean | When using jdbc to write, whether to enable autocommit for automatic transaction commit. |
| spark.sql.catalog.your_catalog_name.jdbc.use-insert-ignore | No | false | Boolean | When enabled, uses INSERT IGNORE instead of INSERT ... ON DUPLICATE KEY UPDATE for handling primary key conflicts. INSERT IGNORE will skip rows with duplicate keys and continue processing, while ON DUPLICATE KEY UPDATE will update existing rows with new values. |
| spark.sql.catalog.your_catalog_name.jdbc.upsert-by-unique-key | No | false | Boolean | When a table has both primary key and unique key index, this option controls which key to use for determining conflict detection. If set to true, uses unique key for conflict detection and updates all columns except unique key columns (including primary key columns). If set to false (default), uses primary key for conflict detection and updates all columns except primary key columns. |
| spark.sql.catalog.your_catalog_name.jdbc.optimize-decimal-string-comparison | No | false | Boolean | When this option is true, DECIMAL(P, 0) columns with precision <= 19 will be converted to BIGINT (LongType) to avoid precision loss when comparing with string literals. This optimization prevents Spark from converting String + DECIMAL to DOUBLE (which loses precision for large numbers). If false (default), DECIMAL columns will remain as DecimalType. Note: This optimization only applies to integer DECIMAL with scale = 0 and precision <= 19 (BIGINT range: -9223372036854775808 to 9223372036854775807). |
| spark.sql.catalog.your_catalog_name.string-as-varchar-length | No | 1024 | Int | Defines the length of VARCHAR type when mapping String types during table creation. Default: 1024. |
| spark.sql.catalog.your_catalog_name.enable-string-as-text | No | false | Boolean | When this option is true, the string type of spark will be converted to text type of OceanBase when creating a table. |
| spark.sql.catalog.your_catalog_name.enable-spark-varchar-datatype | No | false | Boolean | When this option is true, the varchar type of OceanBase will be converted to spark's varchar type. Note that spark varchar type is an experimental feature. |
| spark.sql.catalog.your_catalog_name.enable-always-nullable | No | true | Boolean | Forces all fields to be marked as nullable during schema inference, regardless of the database metadata's nullability constraints. This provides a safety net for handling data sources with incomplete metadata or implicit null values. |
| spark.sql.defaultCatalog | No | String | Set the Spark SQL default catalog. |
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| spark.sql.catalog.your_catalog_name.direct-load.enabled | No | false | Boolean | Enable direct-load writing. |
| spark.sql.catalog.your_catalog_name.direct-load.host | No | String | Hostname used in direct-load. | |
| spark.sql.catalog.your_catalog_name.direct-load.rpc-port | No | 2882 | Integer | Rpc port used in direct-load. |
| spark.sql.catalog.your_catalog_name.direct-load.username | No | String | The direct-load's username. If this configuration is not specified, the jdbc username is used. | |
| spark.sql.catalog.your_catalog_name.direct-load.odp-mode | No | false | Boolean | Whether to use ODP proxy for direct-load. When set to true, it will connect through ODP proxy (typically port 2885) and pass the full username format (e.g., user@tenant#cluster); when set to false (default), it will connect directly to OBServer (typically port 2882). |
| spark.sql.catalog.your_catalog_name.direct-load.parallel | No | 8 | Integer | The parallel of the direct-load server. This parameter determines how much CPU resources the server uses to process this import task. |
| spark.sql.catalog.your_catalog_name.direct-load.batch-size | No | 10240 | Integer | The size of the batch that is written to the OceanBase at one time. |
| spark.sql.catalog.your_catalog_name.direct-load.max-error-rows | No | 0 | Long | Maximum tolerable number of error rows. |
| spark.sql.catalog.your_catalog_name.direct-load.timeout | No | 7d | Duration | The timeout for direct-load task. |
| spark.sql.catalog.your_catalog_name.direct-load.heartbeat-timeout | No | 60s | Duration | Client heartbeat timeout in direct-load task. |
| spark.sql.catalog.your_catalog_name.direct-load.heartbeat-interval | No | 10s | Duration | Client heartbeat interval in direct-load task. |
| spark.sql.catalog.your_catalog_name.direct-load.load-method | No | full | String | The direct-load load mode: full, inc, inc_replace.
|
| spark.sql.catalog.your_catalog_name.direct-load.dup-action | No | REPLACE | String | Action when there is duplicated record of direct-load task. Can be STOP_ON_DUP, REPLACE or IGNORE. |
This section describes the mapping between OceanBase data types and Spark SQL data types when reading from and writing to OceanBase.
Type mapping when reading data from OceanBase MySQL mode:
| OceanBase Type | Spark SQL Type | Description |
|---|---|---|
| Basic Data Types | ||
| BIT | LongType | When size != 1 |
| BIT | BooleanType | When size = 1 |
| TINYINT | BooleanType | |
| SMALLINT | ShortType | |
| INT / INTEGER | IntegerType | |
| BIGINT | LongType | |
| FLOAT | FloatType | |
| DOUBLE | DoubleType | |
| DECIMAL(p,s) | DecimalType(p,s) | |
| CHAR(n) | StringType | |
| VARCHAR(n) | StringType | |
| TEXT | StringType | |
| BINARY | BinaryType | |
| DATE | DateType | |
| DATETIME / TIMESTAMP | TimestampType | |
| Complex Data Types | ||
| ARRAY(type) | ArrayType(corresponding type) | Supports up to 6 levels of nesting. Supported element types: INT, BIGINT, FLOAT, DOUBLE, BOOLEAN, STRING, etc. |
| VECTOR(n) | ArrayType(FloatType) | n represents the vector dimension |
| MAP(keyType, valueType) | MapType(keyType, valueType) | |
| JSON | StringType | JSON data is read as string |
| ENUM(...) | StringType | |
| SET(...) | StringType | |
Type mapping when writing data to existing OceanBase MySQL mode tables:
| Spark SQL Type | OceanBase Type | Description |
|---|---|---|
| Basic Data Types | ||
| BooleanType | BOOLEAN / TINYINT | |
| ByteType | TINYINT | |
| ShortType | SMALLINT | |
| IntegerType | INT | |
| LongType | BIGINT | |
| FloatType | FLOAT | |
| DoubleType | DOUBLE | |
| DecimalType(p,s) | DECIMAL(p,s) | |
| StringType | CHAR / VARCHAR / TEXT | Depends on the actual target column type |
| BinaryType | BINARY | |
| DateType | DATE | |
| TimestampType | DATETIME / TIMESTAMP | |
| Complex Data Types | ||
| ArrayType(IntegerType) | INT[] / ARRAY(INT) | Table must be pre-created with ARRAY type columns |
| ArrayType(FloatType) | FLOAT[] / VECTOR(n) | Can write to VECTOR or FLOAT ARRAY columns |
| MapType(keyType, valueType) | MAP(keyType, valueType) | Table must be pre-created with MAP type columns |
| StringType | JSON | JSON data is written as string |
| StringType | ENUM / SET | Written string values must conform to ENUM/SET definition |
Type mapping when reading data from OceanBase Oracle mode:
| OceanBase Type | Spark SQL Type | Description |
|---|---|---|
| NUMBER(0) | DecimalType | When precision and scale are not specified |
| NUMBER(p,s) | DecimalType(p,s) | |
| BINARY_FLOAT | FloatType | |
| BINARY_DOUBLE | DoubleType | |
| VARCHAR2(n) | StringType | |
| CLOB | StringType | |
| RAW(n) | BinaryType | |
| DATE | DateType | |
| TIMESTAMP | TimestampType | |
| TIMESTAMP WITH TIME ZONE | TimestampType | Supported under specific timezone configurations |
Type mapping when writing data to existing OceanBase Oracle mode tables:
| Spark SQL Type | OceanBase Type | Description |
|---|---|---|
| BooleanType | NUMBER(1) | |
| ByteType | NUMBER(3) | |
| ShortType | NUMBER(5) | |
| IntegerType | NUMBER(10) | |
| LongType | NUMBER(19) | |
| FloatType | BINARY_FLOAT | |
| DoubleType | BINARY_DOUBLE | |
| DecimalType(p,s) | NUMBER(p,s) | |
| StringType | VARCHAR2 / CLOB | Depends on the actual target column type |
| BinaryType | RAW | |
| DateType | DATE | |
| TimestampType | TIMESTAMP |
- Complex type tables must be pre-created: Create tables with complex types directly in OceanBase using SQL before reading or writing data through Spark.
- Nested array limitation: ARRAY types support up to 6 levels of nesting, such as
INT[][]orINT[][][]. - JSON type handling: JSON data is represented as StringType in Spark. Ensure the string content is valid JSON format when writing.
- ENUM and SET types: Represented as StringType in Spark. When writing, values must conform to the enumeration or set values defined in the table.