Skip to content

Latest commit

 

History

History
977 lines (898 loc) · 36.5 KB

File metadata and controls

977 lines (898 loc) · 36.5 KB

OceanBase Spark Catalog

English | 简体中文

OceanBase Spark Connector fully supports Spark Catalog since version 1.1, which provides a more convenient and efficient solution for users to use OceanBase in Spark. By using Spark Catalog, users can access and operate OceanBase databases in a more concise and consistent way.

Currently supported features

  • Currently only supports OceanBase MySQL mode.
  • Supports Spark adaptive partitioning and parallel reading of OceanBase (via JDBC).
    • Predicate pushdown support.
  • Supports writing OceanBase through direct load.
  • Supports writing OceanBase through JDBC.
    • For primary key tables, supports writing in upsert mode.
      • MySQL mode is based on: INSERT INTO ... ON DUPLICATE KEY UPDATE syntax.
    • For non-primary key tables, write through INSERT INTO.
  • Supports managing databases and tables in OceanBase through Spark-SQL, including: show databases, show tables, drop table, drop database and other syntax support.
    • Supports CTAS syntax to create and write OceanBase tables.

Version compatibility

Connector Spark OceanBase Java Scala
1.1 or later versions 3.1 ~ 3.5
  • JDBC: 3.x, 4.x
  • Direct Load: 4.2.x or later versions
8 2.12
  • Note: If you need a package built based on other Scala versions, you can get the package by building it from source code.

Get the package

You can get the release packages at Releases Page or Maven Central.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>spark-connector-oceanbase-3.4_2.12</artifactId>
    <version>${project.version}</version>
</dependency>

If you'd rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>spark-connector-oceanbase-3.4_2.12</artifactId>
    <version>${project.version}</version>
</dependency>

<repositories>
    <repository>
        <id>sonatype-snapshots</id>
        <name>Sonatype Snapshot Repository</name>
        <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

Of course, you can also get the package by building from source code.

  • By default, it is built with scala version 2.12
  • After successful compilation, the target jar package will be generated in the target directory under the module corresponding to each version, such as: spark-connector-oceanbase-3.4_2.12-1.0-SNAPSHOT.jar. Copy this file to Spark's ClassPath to use spark-connector-oceanbase.
git clone https://github.com/oceanbase/spark-connector-oceanbase.git
cd spark-connector-oceanbase
mvn clean package -DskipTests
  • If you need a package built based on other Scala versions, refer to the command below to build based on Scala 2.13.
git clone https://github.com/oceanbase/spark-connector-oceanbase.git
cd spark-connector-oceanbase
mvn clean package -Dscala.version=2.13.15 -Dscala.binary.version=2.13 -DskipTests

How to configure

Set OceanBase Catalog related parameters when starting Spark-SQL CLI

./bin/spark-sql \
--conf "spark.sql.catalog.your_catalog_name=com.oceanbase.spark.catalog.OceanBaseCatalog" \
--conf "spark.sql.catalog.your_catalog_name.url=jdbc:mysql://localhost:2881" \
--conf "spark.sql.catalog.your_catalog_name.username=root@test" \
--conf "spark.sql.catalog.your_catalog_name.password=******" \
--conf "spark.sql.catalog.your_catalog_name.schema-name=test" \
--conf "spark.sql.defaultCatalog=your_catalog_name"

Of course, you can also configure the relevant parameters in the spark configuration file, the default is spark-defaults.conf, as shown below, and then directly start the Spark-SQL CLI.

spark.sql.catalog.your_catalog_name=com.oceanbase.spark.catalog.OceanBaseCatalog
spark.sql.catalog.your_catalog_name.url=jdbc:mysql://localhost:2881
spark.sql.catalog.your_catalog_name.username=root@test
spark.sql.catalog.your_catalog_name.password=******
spark.sql.catalog.your_catalog_name.schema-name=test
spark.sql.defaultCatalog=your_catalog_name

For DataFrame API, you can configure to use OceanBase Catalog in the following way:

val spark = SparkSession
  .builder()
  .master("local[*]")
  .config("spark.sql.catalog.your_catalog_name", "com.oceanbase.spark.catalog.OceanBaseCatalog")
  .config("spark.sql.catalog.your_catalog_name.url", "jdbc:mysql://localhost:2881")
  .config("spark.sql.catalog.your_catalog_name.username", "root@test")
  .config("spark.sql.catalog.your_catalog_name.password", "******")
  .config("spark.sql.catalog.your_catalog_name.schema-name", "test")
  .config("spark.sql.defaultCatalog", "your_catalog_name")
  .getOrCreate()

Usage examples

Manage databases and tables in OceanBase through Spark-SQL

-- show all databases
show databases;

-- drop database test
drop database test;

-- create database test
create database test;

-- use and switch databases
use your_oceanbase_db;

-- show tables in test
show tables;

-- create table test1
CREATE TABLE test.test1(
  user_id BIGINT COMMENT 'test_for_key',
  name VARCHAR(255)
)
PARTITIONED BY (bucket(16, user_id))
COMMENT 'test_for_table_create'
TBLPROPERTIES('replica_num' = 2, COMPRESSION = 'zstd_1.0');

-- drop table test1
drop table test.test1;

Read

SELECT * FROM test.test;

Write

Take synchronizing data from Hive to OceanBase as an example

Preparation

Create corresponding Hive tables and OceanBase tables to prepare for data synchronization

  • Start spark-sql by running ${SPARK_HOME}/bin/spark-sql
CREATE TABLE spark_catalog.default.orders (
  order_id     INT,
  order_date   TIMESTAMP,
  customer_name string,
  price        double,
  product_id   INT,
  order_status BOOLEAN
) using parquet;

insert into spark_catalog.default.orders values
(1, now(), 'zs', 12.2, 12, true),
(2, now(), 'ls', 121.2, 12, true),
(3, now(), 'xx', 123.2, 12, true),
(4, now(), 'jac', 124.2, 12, false),
(5, now(), 'dot', 111.25, 12, true);
  • Connect to OceanBase
CREATE TABLE test.orders (
  order_id     INT PRIMARY KEY,
  order_date   TIMESTAMP,
  customer_name VARCHAR(225),
  price        double,
  product_id   INT,
  order_status BOOLEAN
);

Via JDBC

insert into table test.orders
select * from spark_catalog.default.orders;

Via Direct-Load

Add the following direct-load related parameters to the spark configuration file, which defaults to spark-defaults.conf, as shown below, and then restart the Spark-SQL CLI.

spark.sql.catalog.your_catalog_name=com.oceanbase.spark.catalog.OceanBaseCatalog
spark.sql.catalog.your_catalog_name.url=jdbc:mysql://localhost:2881
spark.sql.catalog.your_catalog_name.username=root@test
spark.sql.catalog.your_catalog_name.password=******
spark.sql.catalog.your_catalog_name.schema-name=test
spark.sql.defaultCatalog=your_catalog_name

# enable direct-load
spark.sql.catalog.your_catalog_name.direct-load.enabled=true;
spark.sql.catalog.your_catalog_name.direct-load.host=localhost;
spark.sql.catalog.your_catalog_name.direct-load.rpc-port=2882;
insert into table test.orders
select * from spark_catalog.default.orders;

Precautions for direct-load:

  • Table locking will occur during the direct-load job. While the table is locked:
    • Data write operations and DDL changes are prohibited.
    • Data queries are allowed.

Configuration

General configuration

Option Required Default Type Description
spark.sql.catalog.your_catalog_name Yes String Sets the class name of the Catalog provider. For OceanBase, the only valid value is com.oceanbase.spark.catalog.OceanBaseCatalog.
spark.sql.catalog.your_catalog_name.url Yes String The JDBC url to connect to OceanBase.
spark.sql.catalog.your_catalog_name.username Yes String The username.
spark.sql.catalog.your_catalog_name.password Yes String The password.
spark.sql.catalog.your_catalog_name.driver No String The class name of the JDBC driver to use to connect to this URL.
spark.sql.catalog.your_catalog_name.schema-name No String Set the default schema for the OceanBase Catalog.
spark.sql.catalog.your_catalog_name.jdbc.fetch-size No 100 Int The JDBC fetch size, which determines how many rows to fetch per round trip.
spark.sql.catalog.your_catalog_name.jdbc.batch-size No 1024 Int The JDBC writing batch size, which determines how many rows to insert per round trip.
spark.sql.catalog.your_catalog_name.jdbc.max-records-per-partition No Int Controls the maximum number of data that can be used as a Spark partition when Spark reads OBs. The default is empty, and Spark will automatically calculate a reasonable value based on the amount of data. Note: It is generally not recommended to set this parameter.
spark.sql.catalog.your_catalog_name.jdbc.parallel-hint-degree No 1 Int The SQL statements sent by Spark to OB will automatically carry PARALLEL Hint. This parameter can be used to adjust the parallelism, and the default value is 1.
spark.sql.catalog.your_catalog_name.jdbc.statistics-parallel-hint-degree No 4 Int Controls the parallelism level for statistical queries (e.g., COUNT, MIN, MAX) by adding /*+ PARALLEL(N) */ hint to generated SQL.
spark.sql.catalog.your_catalog_name.jdbc.partition-compute-parallelism No 32 Int Controls the parallelism level for partition computation. This parameter determines the number of threads used when computing partitions for partitioned tables (mainly through parallel SQL queries to OceanBase partition statistics). The computation task runs on the driver node. Higher values can improve performance for tables with many partitions. When setting a larger value for this parameter, appropriately increasing the CPU cores and memory of the driver node can achieve better performance.
spark.sql.catalog.your_catalog_name.jdbc.partition-compute-timeout-minutes No 10 Int Timeout in minutes for partition computation. This parameter controls how long to wait for partition computation to complete before throwing a timeout exception.
spark.sql.catalog.your_catalog_name.jdbc.query-timeout-hint-degree No -1 Int Control the query timeout by adding /*+ query_timeout(N) */ hint to the generated SQL. This parameter can be used to specify the timeout in microseconds. The default value is -1, which means that the hint is not added.
spark.sql.catalog.your_catalog_name.jdbc.query-hints No String Additional OceanBase query hints added to SELECT query statements. Multiple hints can be specified separated by spaces, e.g. 'READ_CONSISTENCY(WEAK) query_timeout(10000000)'.
spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition No false Boolean When true, primary key tables will be prohibited from using WHERE clause partitioning.
spark.sql.catalog.your_catalog_name.jdbc.{database}.{table}.partition-column No String You can manually specify the primary key table partition column, and by default, one will be automatically selected from the primary key columns.
spark.sql.catalog.your_catalog_name.jdbc.enable-autocommit No false Boolean When using jdbc to write, whether to enable autocommit for automatic transaction commit.
spark.sql.catalog.your_catalog_name.jdbc.use-insert-ignore No false Boolean When enabled, uses INSERT IGNORE instead of INSERT ... ON DUPLICATE KEY UPDATE for handling primary key conflicts. INSERT IGNORE will skip rows with duplicate keys and continue processing, while ON DUPLICATE KEY UPDATE will update existing rows with new values.
spark.sql.catalog.your_catalog_name.jdbc.upsert-by-unique-key No false Boolean When a table has both primary key and unique key index, this option controls which key to use for determining conflict detection. If set to true, uses unique key for conflict detection and updates all columns except unique key columns (including primary key columns). If set to false (default), uses primary key for conflict detection and updates all columns except primary key columns.
spark.sql.catalog.your_catalog_name.jdbc.optimize-decimal-string-comparison No false Boolean When this option is true, DECIMAL(P, 0) columns with precision <= 19 will be converted to BIGINT (LongType) to avoid precision loss when comparing with string literals. This optimization prevents Spark from converting String + DECIMAL to DOUBLE (which loses precision for large numbers). If false (default), DECIMAL columns will remain as DecimalType. Note: This optimization only applies to integer DECIMAL with scale = 0 and precision <= 19 (BIGINT range: -9223372036854775808 to 9223372036854775807).
spark.sql.catalog.your_catalog_name.string-as-varchar-length No 1024 Int Defines the length of VARCHAR type when mapping String types during table creation. Default: 1024.
spark.sql.catalog.your_catalog_name.enable-string-as-text No false Boolean When this option is true, the string type of spark will be converted to text type of OceanBase when creating a table.
spark.sql.catalog.your_catalog_name.enable-spark-varchar-datatype No false Boolean When this option is true, the varchar type of OceanBase will be converted to spark's varchar type. Note that spark varchar type is an experimental feature.
spark.sql.catalog.your_catalog_name.enable-always-nullable No true Boolean Forces all fields to be marked as nullable during schema inference, regardless of the database metadata's nullability constraints. This provides a safety net for handling data sources with incomplete metadata or implicit null values.
spark.sql.defaultCatalog No String Set the Spark SQL default catalog.

Direct load configuration

Option Required Default Type Description
spark.sql.catalog.your_catalog_name.direct-load.enabled No false Boolean Enable direct-load writing.
spark.sql.catalog.your_catalog_name.direct-load.host No String Hostname used in direct-load.
spark.sql.catalog.your_catalog_name.direct-load.rpc-port No 2882 Integer Rpc port used in direct-load.
spark.sql.catalog.your_catalog_name.direct-load.username No String The direct-load's username. If this configuration is not specified, the jdbc username is used.
spark.sql.catalog.your_catalog_name.direct-load.odp-mode No false Boolean Whether to use ODP proxy for direct-load. When set to true, it will connect through ODP proxy (typically port 2885) and pass the full username format (e.g., user@tenant#cluster); when set to false (default), it will connect directly to OBServer (typically port 2882).
spark.sql.catalog.your_catalog_name.direct-load.parallel No 8 Integer The parallel of the direct-load server. This parameter determines how much CPU resources the server uses to process this import task.
spark.sql.catalog.your_catalog_name.direct-load.batch-size No 10240 Integer The size of the batch that is written to the OceanBase at one time.
spark.sql.catalog.your_catalog_name.direct-load.max-error-rows No 0 Long Maximum tolerable number of error rows.
spark.sql.catalog.your_catalog_name.direct-load.timeout No 7d Duration The timeout for direct-load task.
spark.sql.catalog.your_catalog_name.direct-load.heartbeat-timeout No 60s Duration Client heartbeat timeout in direct-load task.
spark.sql.catalog.your_catalog_name.direct-load.heartbeat-interval No 10s Duration Client heartbeat interval in direct-load task.
spark.sql.catalog.your_catalog_name.direct-load.load-method No full String The direct-load load mode: full, inc, inc_replace.
  • full: full direct-load, default value.
  • inc: normal incremental direct-load, primary key conflict check will be performed, observer-4.3.2 and above support, direct-load.dup-action REPLACE is not supported for the time being.
  • inc_replace: special replace mode incremental direct-load, no primary key conflict check will be performed, directly overwrite the old data (equivalent to the effect of replace), direct-load.dup-action parameter will be ignored, observer-4.3.2 and above support.
spark.sql.catalog.your_catalog_name.direct-load.dup-action No REPLACE String Action when there is duplicated record of direct-load task. Can be STOP_ON_DUP, REPLACE or IGNORE.

Data Type Mapping

This section describes the mapping between OceanBase data types and Spark SQL data types when reading from and writing to OceanBase.

MySQL Mode

Reading Data (OceanBase → Spark)

Type mapping when reading data from OceanBase MySQL mode:

OceanBase Type Spark SQL Type Description
Basic Data Types
BIT LongType When size != 1
BIT BooleanType When size = 1
TINYINT BooleanType
SMALLINT ShortType
INT / INTEGER IntegerType
BIGINT LongType
FLOAT FloatType
DOUBLE DoubleType
DECIMAL(p,s) DecimalType(p,s)
CHAR(n) StringType
VARCHAR(n) StringType
TEXT StringType
BINARY BinaryType
DATE DateType
DATETIME / TIMESTAMP TimestampType
Complex Data Types
ARRAY(type) ArrayType(corresponding type) Supports up to 6 levels of nesting. Supported element types: INT, BIGINT, FLOAT, DOUBLE, BOOLEAN, STRING, etc.
VECTOR(n) ArrayType(FloatType) n represents the vector dimension
MAP(keyType, valueType) MapType(keyType, valueType)
JSON StringType JSON data is read as string
ENUM(...) StringType
SET(...) StringType

Writing Data (Spark → OceanBase)

Type mapping when writing data to existing OceanBase MySQL mode tables:

Spark SQL Type OceanBase Type Description
Basic Data Types
BooleanType BOOLEAN / TINYINT
ByteType TINYINT
ShortType SMALLINT
IntegerType INT
LongType BIGINT
FloatType FLOAT
DoubleType DOUBLE
DecimalType(p,s) DECIMAL(p,s)
StringType CHAR / VARCHAR / TEXT Depends on the actual target column type
BinaryType BINARY
DateType DATE
TimestampType DATETIME / TIMESTAMP
Complex Data Types
ArrayType(IntegerType) INT[] / ARRAY(INT) Table must be pre-created with ARRAY type columns
ArrayType(FloatType) FLOAT[] / VECTOR(n) Can write to VECTOR or FLOAT ARRAY columns
MapType(keyType, valueType) MAP(keyType, valueType) Table must be pre-created with MAP type columns
StringType JSON JSON data is written as string
StringType ENUM / SET Written string values must conform to ENUM/SET definition

Oracle Mode

Reading Data (OceanBase → Spark)

Type mapping when reading data from OceanBase Oracle mode:

OceanBase Type Spark SQL Type Description
NUMBER(0) DecimalType When precision and scale are not specified
NUMBER(p,s) DecimalType(p,s)
BINARY_FLOAT FloatType
BINARY_DOUBLE DoubleType
VARCHAR2(n) StringType
CLOB StringType
RAW(n) BinaryType
DATE DateType
TIMESTAMP TimestampType
TIMESTAMP WITH TIME ZONE TimestampType Supported under specific timezone configurations

Writing Data (Spark → OceanBase)

Type mapping when writing data to existing OceanBase Oracle mode tables:

Spark SQL Type OceanBase Type Description
BooleanType NUMBER(1)
ByteType NUMBER(3)
ShortType NUMBER(5)
IntegerType NUMBER(10)
LongType NUMBER(19)
FloatType BINARY_FLOAT
DoubleType BINARY_DOUBLE
DecimalType(p,s) NUMBER(p,s)
StringType VARCHAR2 / CLOB Depends on the actual target column type
BinaryType RAW
DateType DATE
TimestampType TIMESTAMP

Important Notes

  1. Complex type tables must be pre-created: Create tables with complex types directly in OceanBase using SQL before reading or writing data through Spark.
  2. Nested array limitation: ARRAY types support up to 6 levels of nesting, such as INT[][] or INT[][][].
  3. JSON type handling: JSON data is represented as StringType in Spark. Ensure the string content is valid JSON format when writing.
  4. ENUM and SET types: Represented as StringType in Spark. When writing, values must conform to the enumeration or set values defined in the table.