Skip to content

Commit d24172f

Browse files
yuanoOowhhe
authored andcommitted
BugFix: Fix the dbtable parameter creation error under jdbc sink. (#13)
1 parent 0a0da11 commit d24172f

File tree

3 files changed

+23
-9
lines changed

3 files changed

+23
-9
lines changed

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/com/oceanbase/spark/jdbc/OBJdbcUtils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,22 @@ object OBJdbcUtils {
3636
val statement = conn.createStatement
3737
try {
3838
val rs = statement.executeQuery("SHOW VARIABLES LIKE 'ob_compatibility_mode'")
39-
if (rs.next) rs.getString("VALUE") else null
39+
if (rs.next) rs.getString("VALUE")
40+
else throw new RuntimeException("Failed to obtain compatible mode of OceanBase.")
4041
} finally {
4142
statement.close()
4243
conn.close()
4344
}
4445
}
4546

47+
def getDbTable(oceanBaseConfig: OceanBaseConfig): String = {
48+
if ("MySQL".equalsIgnoreCase(getCompatibleMode(oceanBaseConfig))) {
49+
s"`${oceanBaseConfig.getSchemaName}`.`${oceanBaseConfig.getTableName}`"
50+
} else {
51+
s""""${oceanBaseConfig.getSchemaName}"."${oceanBaseConfig.getTableName}""""
52+
}
53+
}
54+
4655
def truncateTable(oceanBaseConfig: OceanBaseConfig): Unit = {
4756
val conn = getConnection(oceanBaseConfig)
4857
val statement = conn.createStatement

spark-connector-oceanbase/spark-connector-oceanbase-base/src/main/scala/org/apache/spark/sql/OceanBaseSparkDataSource.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
package org.apache.spark.sql
1717

1818
import com.oceanbase.spark.config.OceanBaseConfig
19-
import com.oceanbase.spark.jdbc.OBJdbcUtils.getCompatibleMode
19+
import com.oceanbase.spark.jdbc.OBJdbcUtils.{getCompatibleMode, getDbTable}
2020
import com.oceanbase.spark.sql.OceanBaseSparkSource
2121

22-
import OceanBaseSparkDataSource.{JDBC_TXN_ISOLATION_LEVEL, JDBC_URL, JDBC_USER, SHORT_NAME}
22+
import OceanBaseSparkDataSource.{JDBC_TXN_ISOLATION_LEVEL, JDBC_URL, JDBC_USER, OCEANBASE_DEFAULT_ISOLATION_LEVEL, SHORT_NAME}
2323
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCRelation, JdbcRelationProvider}
2424
import org.apache.spark.sql.jdbc.{JdbcDialects, OceanBaseMySQLDialect, OceanBaseOracleDialect}
2525
import org.apache.spark.sql.sources._
@@ -65,7 +65,7 @@ class OceanBaseSparkDataSource extends JdbcRelationProvider {
6565
JDBC_URL -> oceanBaseConfig.getURL,
6666
JDBC_USER -> parameters(OceanBaseConfig.USERNAME.getKey),
6767
JDBC_TXN_ISOLATION_LEVEL -> {
68-
if (!parameters.contains(JDBC_TXN_ISOLATION_LEVEL)) "READ_COMMITTED"
68+
if (!parameters.contains(JDBC_TXN_ISOLATION_LEVEL)) OCEANBASE_DEFAULT_ISOLATION_LEVEL
6969
else parameters(JDBC_TXN_ISOLATION_LEVEL)
7070
}
7171
)
@@ -74,7 +74,7 @@ class OceanBaseSparkDataSource extends JdbcRelationProvider {
7474
paraMap =
7575
paraMap + (JDBCOptions.JDBC_QUERY_STRING -> parameters(JDBCOptions.JDBC_QUERY_STRING))
7676
} else {
77-
paraMap = paraMap + (JDBCOptions.JDBC_TABLE_NAME -> oceanBaseConfig.getTableName)
77+
paraMap = paraMap + (JDBCOptions.JDBC_TABLE_NAME -> getDbTable(oceanBaseConfig))
7878
}
7979

8080
// Set dialect
@@ -92,4 +92,5 @@ object OceanBaseSparkDataSource {
9292
val JDBC_URL = "url"
9393
val JDBC_USER = "user"
9494
val JDBC_TXN_ISOLATION_LEVEL = "isolationLevel"
95+
val OCEANBASE_DEFAULT_ISOLATION_LEVEL = "READ_COMMITTED"
9596
}

spark-connector-oceanbase/spark-connector-oceanbase-base/src/test/scala/com/oceanbase/spark/OceanBaseMySQLConnectorITCase.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
3434
|CREATE TEMPORARY VIEW test_sink
3535
|USING oceanbase
3636
|OPTIONS(
37-
| "url"= "$getJdbcUrl",
37+
| "url"= "$getJdbcUrlWithoutDB",
3838
| "rpc-port" = "$getRpcPort",
3939
| "schema-name"="$getSchemaName",
4040
| "table-name"="products",
@@ -89,7 +89,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
8989
|CREATE TEMPORARY VIEW test_sink
9090
|USING oceanbase
9191
|OPTIONS(
92-
| "url"= "$getJdbcUrl",
92+
| "url"= "$getJdbcUrlWithoutDB",
9393
| "rpc-port" = "$getRpcPort",
9494
| "schema-name"="$getSchemaName",
9595
| "table-name"="products",
@@ -206,7 +206,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
206206
|CREATE TEMPORARY VIEW test_sink
207207
|USING oceanbase
208208
|OPTIONS(
209-
| "url"= "$getJdbcUrl",
209+
| "url"= "$getJdbcUrlWithoutDB",
210210
| "rpc-port" = "$getRpcPort",
211211
| "schema-name"="$getSchemaName",
212212
| "table-name"="products",
@@ -437,7 +437,7 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
437437
// DataFrame read
438438
val dataFrame = session.read
439439
.format("oceanbase")
440-
.option("url", getJdbcUrl)
440+
.option("url", getJdbcUrlWithoutDB)
441441
.option("username", getUsername)
442442
.option("password", getPassword)
443443
.option("table-name", "products")
@@ -469,6 +469,10 @@ class OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {
469469
session.stop()
470470
dropTables("products")
471471
}
472+
473+
def getJdbcUrlWithoutDB: String =
474+
s"jdbc:mysql://$getHost:$getPort?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
475+
472476
}
473477

474478
object OceanBaseMySQLConnectorITCase extends OceanBaseMySQLTestBase {

0 commit comments

Comments
 (0)