Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/spark-catalog-oceanbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,13 @@ Precautions for direct-load:
<td>Int</td>
<td>Control the query timeout by adding /*+ query_timeout(N) */ hint to the generated SQL. This parameter can be used to specify the timeout in microseconds. The default value is -1, which means that the hint is not added.</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.jdbc.query-hints</td>
<td>No</td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>Additional OceanBase query hints added to SELECT query statements. Multiple hints can be specified separated by spaces, e.g. 'READ_CONSISTENCY(WEAK) query_timeout(10000000)'.</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition</td>
<td>No</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/spark-catalog-oceanbase_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@ select * from spark_catalog.default.orders;
<td>Int</td>
<td>通过向生成的 SQL 添加 /*+ query_timeout(n) */ hint 来控制查询超时时间。通过该参数可以设置超时时间,单位为微妙。默认为-1,表示不添加该Hint。</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.jdbc.query-hints</td>
<td>否</td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>向SELECT查询语句添加额外的OceanBase查询提示。可指定多个提示,用空格分隔,如'READ_CONSISTENCY(WEAK) query_timeout(10000000)'。</td>
</tr>
<tr>
<td>spark.sql.catalog.your_catalog_name.jdbc.disable-pk-table-use-where-partition</td>
<td>否</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.lang3.StringUtils;

public class OceanBaseConfig extends Config implements Serializable {
private static final String EMPTY_STRING = "";

public static final ConfigEntry<String> URL =
new ConfigBuilder("url")
Expand Down Expand Up @@ -284,6 +285,14 @@ public class OceanBaseConfig extends Config implements Serializable {
.intConf()
.createWithDefault(-1);

public static final ConfigEntry<String> JDBC_QUERY_HINT_DEGREE =
new ConfigBuilder("jdbc.query-hints")
.doc(
"Additional OceanBase query hints added to SELECT query statements. Multiple hints can be specified separated by spaces, e.g. 'READ_CONSISTENCY(WEAK) query_timeout(10000000)'.")
.version(ConfigConstants.VERSION_1_4_0)
.stringConf()
.createWithDefault(EMPTY_STRING);

public static final ConfigEntry<Integer> JDBC_STATISTICS_PARALLEL_HINT_DEGREE =
new ConfigBuilder("jdbc.statistics-parallel-hint-degree")
.doc(
Expand Down Expand Up @@ -613,6 +622,10 @@ public Integer getQueryTimeoutHintDegree() {
return get(JDBC_QUERY_TIMEOUT_HINT_DEGREE);
}

public String getQueryHintDegree() {
return get(JDBC_QUERY_HINT_DEGREE);
}

public Integer getJdbcStatsParallelHintDegree() {
return get(JDBC_STATISTICS_PARALLEL_HINT_DEGREE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,27 +179,27 @@ class OBJdbcReader(
else
""
val useHiddenPKColumnHint = if (part.useHiddenPKColumn) {
s", opt_param('hidden_column_visible', 'true') "
s" opt_param('hidden_column_visible', 'true') "
} else {
""
}
(myLimitClause, useHiddenPKColumnHint)
case part: OBOraclePartition =>
val useHiddenPKColumnHint = if (part.useHiddenPKColumn) {
s", opt_param('hidden_column_visible', 'true') "
s" opt_param('hidden_column_visible', 'true') "
} else {
""
}
("", useHiddenPKColumnHint)
case _ => throw new RuntimeException(s"Unsupported partition type: ${partition.getClass}")
}
val queryTimeoutHint = if (config.getQueryTimeoutHintDegree > 0) {
s", query_timeout(${config.getQueryTimeoutHintDegree}) "
s" query_timeout(${config.getQueryTimeoutHintDegree}) "
} else {
""
}
val hint =
s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}) $useHiddenPKColumnHint $queryTimeoutHint */"
s"/*+ PARALLEL(${config.getJdbcParallelHintDegree}) $useHiddenPKColumnHint $queryTimeoutHint ${config.getQueryHintDegree} */"

val partitionClause = partition match {
case part: OBMySQLPartition => part.partitionClause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,44 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
session.stop()
}

@Test
def testJdbcQueryHints(): Unit = {
val session = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.catalog.ob", OB_CATALOG_CLASS)
.config("spark.sql.catalog.ob.url", getJdbcUrl)
.config("spark.sql.catalog.ob.username", getUsername)
.config("spark.sql.catalog.ob.password", getPassword)
.config("spark.sql.catalog.ob.schema-name", getSchemaName)
.config(
"spark.sql.catalog.ob.jdbc.query-hints",
"READ_CONSISTENCY(STRONG) query_timeout(10000000)")
.getOrCreate()

session.sql("use ob;")
insertTestData(session, "products")
queryAndVerifyTableData(session, "products", expected)
session.stop()

// empty test case
val session1 = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.catalog.ob", OB_CATALOG_CLASS)
.config("spark.sql.catalog.ob.url", getJdbcUrl)
.config("spark.sql.catalog.ob.username", getUsername)
.config("spark.sql.catalog.ob.password", getPassword)
.config("spark.sql.catalog.ob.schema-name", getSchemaName)
.config("spark.sql.catalog.ob.jdbc.query-hints", "")
.getOrCreate()

session1.sql("use ob;")
insertTestData(session1, "products")
queryAndVerifyTableData(session1, "products", expected)
session1.stop()
}

private def queryAndVerifyTableData(
session: SparkSession,
tableName: String,
Expand Down