English | 简体中文
OceanBase Spark Connector 从 1.1 版本开始全面支持 Spark Catalog,这为用户在 Spark 中使用 OceanBase 提供了更便捷和高效的解决方案。 通过使用 Spark Catalog,用户能够以更加简洁和一致的方式访问和操作 OceanBase 数据库。
- 目前仅支持OceanBase MySQL模式。
- 支持Spark自适应分区并行读取OceanBase(基于JDBC)。
- 谓词下推支持
- 支持通过旁路导入的方式写OceanBase。
- 支持通过JDBC的方式写OceanBase。
- 对于主键表,支持以upsert的方式写入。
- MySQL模式基于:
INSERT INTO ... ON DUPLICATE KEY UPDATE语法。
- MySQL模式基于:
- 对于非主键表,则通过
INSERT INTO写入。
- 对于主键表,支持以upsert的方式写入。
- 支持通过Spark-SQL管理OceanBase中的数据库和表,包括:show databases、show tables、drop table、drop database等语法支持。
- 支持CTAS语法创建和写入OceanBase表。
| Connector | Spark | OceanBase | Java | Scala |
|---|---|---|---|---|
| 1.1 及以后的版本 | 3.1 ~ 3.5 |
|
8 | 2.12 |
- 注意:如果需要基于其他 scala 版本构建的程序包, 您可以通过源码构建的方式获得程序包
您可以在 Releases 页面 或者 Maven 中央仓库 找到正式的发布版本。
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>spark-connector-oceanbase-3.4_2.12</artifactId>
<version>${project.version}</version>
</dependency>如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定:
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>spark-connector-oceanbase-3.4_2.12</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>当然您也可以通过源码构建的方式获得程序包。
- 默认以scala 2.12版本进行构建
- 编译成功后,会在各个版本对应的模块下的target目录生成目标 jar 包,如:spark-connector-oceanbase-3.4_2.12-1.0-SNAPSHOT.jar。 将此文件复制到 Spark 的 ClassPath 中即可使用 spark-connector-oceanbase。
git clone https://github.com/oceanbase/spark-connector-oceanbase.git
cd spark-connector-oceanbase
mvn clean package -DskipTests- 如果需要其他 scala 版本,请参考下面以 scala 2.13版本构建命令
git clone https://github.com/oceanbase/spark-connector-oceanbase.git
cd spark-connector-oceanbase
mvn clean package -Dscala.version=2.13.15 -Dscala.binary.version=2.13 -DskipTests启动 Spark-SQL CLI时设置OceanBase Catalog相关参数
./bin/spark-sql \
--conf "spark.sql.catalog.your_catalog_name=com.oceanbase.spark.catalog.OceanBaseCatalog" \
--conf "spark.sql.catalog.your_catalog_name.url=jdbc:mysql://localhost:2881" \
--conf "spark.sql.catalog.your_catalog_name.username=root@test" \
--conf "spark.sql.catalog.your_catalog_name.password=******" \
--conf "spark.sql.catalog.your_catalog_name.schema-name=test" \
--conf "spark.sql.defaultCatalog=your_catalog_name"当然也可以将相关参数配置到spark的配置文件中,默认为spark-defaults.conf,如下所示,然后直接启动 Spark-SQL CLI.
spark.sql.catalog.your_catalog_name=com.oceanbase.spark.catalog.OceanBaseCatalog
spark.sql.catalog.your_catalog_name.url=jdbc:mysql://localhost:2881
spark.sql.catalog.your_catalog_name.username=root@test
spark.sql.catalog.your_catalog_name.password=******
spark.sql.catalog.your_catalog_name.schema-name=test
spark.sql.defaultCatalog=your_catalog_name对于DataFrame API可以通过下述方式配置使用OceanBase Catalog:
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.catalog.your_catalog_name", "com.oceanbase.spark.catalog.OceanBaseCatalog")
.config("spark.sql.catalog.your_catalog_name.url", "jdbc:mysql://localhost:2881")
.config("spark.sql.catalog.your_catalog_name.username", "root@test")
.config("spark.sql.catalog.your_catalog_name.password", "******")
.config("spark.sql.catalog.your_catalog_name.schema-name", "test")
.config("spark.sql.defaultCatalog", "your_catalog_name")
.getOrCreate()-- show all databases
show databases;
-- drop database test
drop database test;
-- create database test
create database test;
-- use and switch databases
use your_oceanbase_db;
-- show tables in test
show tables;
-- create table test1
CREATE TABLE test.test1(
user_id BIGINT COMMENT 'test_for_key',
name VARCHAR(255)
)
PARTITIONED BY (bucket(16, user_id))
COMMENT 'test_for_table_create'
TBLPROPERTIES('replica_num' = 2, COMPRESSION = 'zstd_1.0');
-- drop table test1
drop table test.test1;
SELECT * FROM test.test;以从Hive同步数据到OceanBase为例
创建对应的Hive表和OceanBase表,为数据同步做准备
- 通过${SPARK_HOME}/bin/spark-sql命令,开启spark-sql
CREATE TABLE spark_catalog.default.orders (
order_id INT,
order_date TIMESTAMP,
customer_name string,
price double,
product_id INT,
order_status BOOLEAN
) using parquet;
insert into spark_catalog.default.orders values
(1, now(), 'zs', 12.2, 12, true),
(2, now(), 'ls', 121.2, 12, true),
(3, now(), 'xx', 123.2, 12, true),
(4, now(), 'jac', 124.2, 12, false),
(5, now(), 'dot', 111.25, 12, true);- 连接到OceanBase
CREATE TABLE test.orders (
order_id INT PRIMARY KEY,
order_date TIMESTAMP,
customer_name VARCHAR(225),
price double,
product_id INT,
order_status BOOLEAN
);insert into table test.orders
select * from spark_catalog.default.orders;将下列旁路导入相关参数添加到spark的配置文件中,默认为spark-defaults.conf,如下所示,然后重新启动 Spark-SQL CLI.
spark.sql.catalog.your_catalog_name=com.oceanbase.spark.catalog.OceanBaseCatalog
spark.sql.catalog.your_catalog_name.url=jdbc:mysql://localhost:2881
spark.sql.catalog.your_catalog_name.username=root@test
spark.sql.catalog.your_catalog_name.password=******
spark.sql.catalog.your_catalog_name.schema-name=test
spark.sql.defaultCatalog=your_catalog_name
# 旁路导入相关参数
spark.sql.catalog.your_catalog_name.direct-load.enabled=true;
spark.sql.catalog.your_catalog_name.direct-load.host=localhost;
spark.sql.catalog.your_catalog_name.direct-load.rpc-port=2882;insert into table test.orders
select * from spark_catalog.default.orders;旁路导入注意事项:
- 旁路导入任务期间会锁表,锁表期间不能写入数据、不能进行DDL变更,但可以进行数据查询。
| 参数名 | 是否必须 | 默认值 | 类型 | 描述 |
|---|---|---|---|---|
| spark.sql.catalog.your_catalog_name | 是 | String | 设置 Catalog 提供者的类名, 对于 OceanBase 来说唯一的有效值为 com.oceanbase.spark.catalog.OceanBaseCatalog。 | |
| spark.sql.catalog.your_catalog_name.url | 是 | String | 连接到OceanBase的 JDBC url。 | |
| spark.sql.catalog.your_catalog_name.username | 是 | String | 用户名。 | |
| spark.sql.catalog.your_catalog_name.password | 是 | String | 密码。 | |
| spark.sql.catalog.your_catalog_name.driver | 否 | String | 用于连接到此 URL 的 JDBC 驱动程序的类名。 | |
| spark.sql.catalog.your_catalog_name.schema-name | 否 | String | 设置该OceanBase Catalog默认 schema。 | |
| spark.sql.catalog.your_catalog_name.jdbc.fetch-size | 否 | 100 | Int | JDBC 读取时抓取大小,决定每次数据库往返获取的数据行数。 |
| spark.sql.catalog.your_catalog_name.jdbc.batch-size | 否 | 1024 | Int | JDBC 写入批处理大小,决定每次数据库往返批量插入的数据行数。 |
| spark.sql.catalog.your_catalog_name.jdbc.max-records-per-partition | 否 | Int | 控制Spark读取OB时,最多多少条数据作为一个Spark分区。默认为空,此时Spark会根据数据量自动计算出一个合理的数值。注意:一般情况下不建议设置该参数。 | |
| spark.sql.catalog.your_catalog_name.jdbc.parallel-hint-degree | 否 | 1 | Int | Spark读取OB时,Spark下发到OB的SQL会自动带上PARALLEL Hint。通过该参数可以调整其并行度,默认为1。 |
| spark.sql.catalog.your_catalog_name.jdbc.statistics-parallel-hint-degree | 否 | 4 | Int | 通过向生成的 SQL 添加 /*+ PARALLEL(N) */ hint 来控制统计查询(例如 COUNT、MIN、MAX)的并行级别。 |
| spark.sql.catalog.your_catalog_name.jdbc.partition-compute-parallelism | 否 | 32 | Int | 控制分区计算的并行级别。此参数确定计算分区表分区时使用的线程数,主要通过并行 SQL 查询 OceanBase 分区统计信息来实现。该计算任务在 driver 节点运行,对于分区数量较多的表,设置更高的值可以显著提升性能。当指定的该参数值较大的时候,适当调大 driver 节点的 CPU 核数和内存,可以取得更好的性能。 |
| spark.sql.catalog.your_catalog_name.jdbc.partition-compute-timeout-minutes | 否 | 10 | Int | 分区计算的超时时间,单位为分钟。此参数控制在抛出超时异常之前等待分区计算完成的时长。 |
| spark.sql.catalog.your_catalog_name.jdbc.query-timeout-hint-degree | 否 | -1 | Int | 通过向生成的 SQL 添加 /*+ query_timeout(n) */ hint 来控制查询超时时间。通过该参数可以设置超时时间,单位为微妙。默认为-1,表示不添加该Hint。 |
| spark.sql.catalog.your_catalog_name.jdbc.query-hints | 否 | String | 向SELECT查询语句添加额外的OceanBase查询提示。可指定多个提示,用空格分隔,如'READ_CONSISTENCY(WEAK) query_timeout(10000000)'。 | |
| spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition | 否 | false | Boolean | 如果为true,则将禁止主键表使用 where 子句进行分区。 |
| spark.sql.catalog.your_catalog_name.jdbc.{database}.{table}.partition-column | 否 | String | 您可以手动指定主键表分区列,否则将默认自动从主键列中选择一个。 | |
| spark.sql.catalog.your_catalog_name.jdbc.enable-autocommit | 否 | false | Boolean | 使用 jdbc 写入时,是否启用 autocommit 进行事务自动提交。 |
| spark.sql.catalog.your_catalog_name.jdbc.use-insert-ignore | 否 | false | Boolean | 当启用时,使用 INSERT IGNORE 而不是 INSERT ... ON DUPLICATE KEY UPDATE 来处理主键冲突。INSERT IGNORE 会跳过具有重复键的行并继续处理,而 ON DUPLICATE KEY UPDATE 会用新值更新现有行。 |
| spark.sql.catalog.your_catalog_name.jdbc.upsert-by-unique-key | 否 | false | Boolean | 当表同时拥有主键和唯一键索引时,此选项控制使用哪个键来进行冲突检测。如果设置为 true,则使用唯一键进行冲突检测,并更新除唯一键列之外的所有列(包括主键列)。如果设置为 false(默认值),则使用主键进行冲突检测,并更新除主键列之外的所有列。 |
| spark.sql.catalog.your_catalog_name.jdbc.optimize-decimal-string-comparison | 否 | false | Boolean | 当该选项为 true 时,精度小于等于 19 的 DECIMAL(P, 0) 列将被转换为 BIGINT (LongType),以避免与字符串字面量比较时的精度丢失问题。此优化可防止 Spark 将 String + DECIMAL 转换为 DOUBLE(对于大数字会丢失精度)。如果设置为 false(默认值),DECIMAL 列将保持为 DecimalType。注意:此优化仅适用于 scale = 0 的整数型 DECIMAL,且 precision <= 19(BIGINT 范围:-9223372036854775808 到 9223372036854775807)。 |
| spark.sql.catalog.your_catalog_name.string-as-varchar-length | 否 | 1024 | Int | 通过Spark-SQL创建OceanBase表时,映射 Spark String 类型到 OceanBase VARCHAR 类型的长度。默认值:1024。 |
| spark.sql.catalog.your_catalog_name.enable-string-as-text | 否 | false | Boolean | 当该选项为 true 时,通过 Spark-SQL 创建 OceanBase 表时,spark 的 String 类型会转换为 OceanBase 的 text 类型。 |
| spark.sql.catalog.your_catalog_name.enable-spark-varchar-datatype | 否 | false | Boolean | 当该选项为 true 时,OceanBase 的 VARCHAR 类型将会被转换为 spark 的 VARCHAR 类型。需要注意的是,spark VARCHAR 类型是一个实验性的功能。 |
| spark.sql.catalog.your_catalog_name.enable-always-nullable | 否 | true | Boolean | 在模式推断过程中强制将所有字段标记为可空,忽略数据库元数据中的非空约束。用于处理元数据不完整或包含隐性空值的数据源。 |
| spark.sql.defaultCatalog | 否 | String | 设置 Spark SQL默认 catalog。 |
| 参数名 | 是否必须 | 默认值 | 类型 | 描述 |
|---|---|---|---|---|
| spark.sql.catalog.your_catalog_name.direct-load.enabled | 否 | false | Boolean | 是否开启旁路导入写入。 |
| spark.sql.catalog.your_catalog_name.direct-load.host | 否 | String | 旁路导入用到的host地址。 | |
| spark.sql.catalog.your_catalog_name.direct-load.rpc-port | 否 | 2882 | Integer | 旁路导入用到的rpc端口。 |
| spark.sql.catalog.your_catalog_name.direct-load.username | 否 | String | 旁路导入用户名。如果不指定该配置,则使用jdbc用户名。 | |
| spark.sql.catalog.your_catalog_name.direct-load.odp-mode | 否 | false | Boolean | 是否通过ODP代理进行旁路导入。当设置为true时,将通过ODP代理(通常是2885端口)连接,此时会传递完整的用户名格式(如user@tenant#cluster);当设置为false(默认)时,直连OBServer(通常是2882端口)。 |
| spark.sql.catalog.your_catalog_name.direct-load.parallel | 否 | 8 | Integer | 旁路导入服务端的并发度。该参数决定了服务端使用多少cpu资源来处理本次导入任务。 |
| spark.sql.catalog.your_catalog_name.direct-load.batch-size | 否 | 10240 | Integer | 一次写入OceanBase的批大小。 |
| spark.sql.catalog.your_catalog_name.direct-load.max-error-rows | 否 | 0 | Long | 旁路导入任务最大可容忍的错误行数目。 |
| spark.sql.catalog.your_catalog_name.direct-load.timeout | 否 | 7d | Duration | 旁路导入任务的超时时间。 |
| spark.sql.catalog.your_catalog_name.direct-load.heartbeat-timeout | 否 | 60s | Duration | 旁路导入任务客户端的心跳超时时间。 |
| spark.sql.catalog.your_catalog_name.direct-load.heartbeat-interval | 否 | 10s | Duration | 旁路导入任务客户端的心跳间隔时间。 |
| spark.sql.catalog.your_catalog_name.direct-load.load-method | 否 | full | String | 旁路导入导入模式:full, inc, inc_replace。
|
| spark.sql.catalog.your_catalog_name.direct-load.dup-action | 否 | REPLACE | String | 旁路导入任务中主键重复时的处理策略。可以是 STOP_ON_DUP(本次导入失败),REPLACE(替换)或 IGNORE(忽略)。 |
本节介绍在读写 OceanBase 数据时,OceanBase 数据类型与 Spark SQL 数据类型之间的映射关系。
从 OceanBase MySQL 模式读取数据时的类型映射:
| OceanBase 类型 | Spark SQL 类型 | 说明 |
|---|---|---|
| 基础数据类型 | ||
| BIT | LongType | 当 size != 1 时 |
| BIT | BooleanType | 当 size = 1 时 |
| TINYINT | BooleanType | |
| SMALLINT | ShortType | |
| INT / INTEGER | IntegerType | |
| BIGINT | LongType | |
| FLOAT | FloatType | |
| DOUBLE | DoubleType | |
| DECIMAL(p,s) | DecimalType(p,s) | |
| CHAR(n) | StringType | |
| VARCHAR(n) | StringType | |
| TEXT | StringType | |
| BINARY | BinaryType | |
| DATE | DateType | |
| DATETIME / TIMESTAMP | TimestampType | |
| 复杂数据类型 | ||
| ARRAY(type) | ArrayType(对应类型) | 支持最多 6 层嵌套,支持的元素类型包括:INT, BIGINT, FLOAT, DOUBLE, BOOLEAN, STRING 等 |
| VECTOR(n) | ArrayType(FloatType) | n 表示向量维度 |
| MAP(keyType, valueType) | MapType(keyType, valueType) | |
| JSON | StringType | JSON 数据以字符串形式读取 |
| ENUM(...) | StringType | |
| SET(...) | StringType | |
向已存在的 OceanBase MySQL 模式表写入数据时的类型映射:
| Spark SQL 类型 | OceanBase 类型 | 说明 |
|---|---|---|
| 基础数据类型 | ||
| BooleanType | BOOLEAN / TINYINT | |
| ByteType | TINYINT | |
| ShortType | SMALLINT | |
| IntegerType | INT | |
| LongType | BIGINT | |
| FloatType | FLOAT | |
| DoubleType | DOUBLE | |
| DecimalType(p,s) | DECIMAL(p,s) | |
| StringType | CHAR / VARCHAR / TEXT | 根据目标列的实际类型 |
| BinaryType | BINARY | |
| DateType | DATE | |
| TimestampType | DATETIME / TIMESTAMP | |
| 复杂数据类型 | ||
| ArrayType(IntegerType) | INT[] / ARRAY(INT) | 表必须预先创建包含 ARRAY 类型的列 |
| ArrayType(FloatType) | FLOAT[] / VECTOR(n) | 可以写入到 VECTOR 或 FLOAT ARRAY 列 |
| MapType(keyType, valueType) | MAP(keyType, valueType) | 表必须预先创建包含 MAP 类型的列 |
| StringType | JSON | JSON 数据以字符串形式写入 |
| StringType | ENUM / SET | 写入的字符串值必须符合 ENUM/SET 定义 |
从 OceanBase Oracle 模式读取数据时的类型映射:
| OceanBase 类型 | Spark SQL 类型 | 说明 |
|---|---|---|
| NUMBER(0) | DecimalType | 未指定精度和标度时 |
| NUMBER(p,s) | DecimalType(p,s) | |
| BINARY_FLOAT | FloatType | |
| BINARY_DOUBLE | DoubleType | |
| VARCHAR2(n) | StringType | |
| CLOB | StringType | |
| RAW(n) | BinaryType | |
| DATE | DateType | |
| TIMESTAMP | TimestampType | |
| TIMESTAMP WITH TIME ZONE | TimestampType | 在特定时区配置下支持 |
向已存在的 OceanBase Oracle 模式表写入数据时的类型映射:
| Spark SQL 类型 | OceanBase 类型 | 说明 |
|---|---|---|
| BooleanType | NUMBER(1) | |
| ByteType | NUMBER(3) | |
| ShortType | NUMBER(5) | |
| IntegerType | NUMBER(10) | |
| LongType | NUMBER(19) | |
| FloatType | BINARY_FLOAT | |
| DoubleType | BINARY_DOUBLE | |
| DecimalType(p,s) | NUMBER(p,s) | |
| StringType | VARCHAR2 / CLOB | 根据目标列的实际类型 |
| BinaryType | RAW | |
| DateType | DATE | |
| TimestampType | TIMESTAMP |
- 复杂类型表必须预先创建:通过 SQL 直接在 OceanBase 中创建包含复杂类型的表,然后才能通过 Spark 读写数据。
- 嵌套数组限制:ARRAY 类型最多支持 6 层嵌套,例如
INT[][]或INT[][][]。 - JSON 类型处理:JSON 数据在 Spark 中以 StringType 表示,写入时确保字符串内容是有效的 JSON 格式。
- ENUM 和 SET 类型:在 Spark 中以 StringType 表示,写入时值必须符合表定义中的枚举或集合值。