Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.oceanbase.spark.utils.OBJdbcUtils

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -114,6 +115,10 @@ class OceanBaseCatalog

override def loadTable(ident: Identifier): Table = {
checkNamespace(ident.namespace())
// Check if table exists first to support UNCACHE TABLE IF EXISTS in Spark 3.3+
if (!tableExists(ident)) {
throw new NoSuchTableException(ident)
}
val config = genNewOceanBaseConfig(this.config, ident)
try {
val schema = resolveTable(config, dialect)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,38 @@ class OBCatalogMySQLITCase extends OceanBaseMySQLTestBase {
session.stop()
}

@Test
def testUncacheTableIfExists(): Unit = {
val session = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.catalog.ob", OB_CATALOG_CLASS)
.config("spark.sql.catalog.ob.url", getJdbcUrl)
.config("spark.sql.catalog.ob.username", getUsername)
.config("spark.sql.catalog.ob.password", getPassword)
.config("spark.sql.catalog.ob.schema-name", getSchemaName)
.getOrCreate()

session.sql("use ob;")
insertTestData(session, "products")

// Test UNCACHE TABLE IF EXISTS with existing table - should succeed
Assertions.assertDoesNotThrow(new ThrowingSupplier[Unit] {
override def get(): Unit = {
session.sql("UNCACHE TABLE IF EXISTS products")
}
})

// Test UNCACHE TABLE IF EXISTS with non-existing table - should not throw exception
Assertions.assertDoesNotThrow(new ThrowingSupplier[Unit] {
override def get(): Unit = {
session.sql("UNCACHE TABLE IF EXISTS non_existing_table")
}
})

session.stop()
}

@Test
def testCatalogOp(): Unit = {
val session = SparkSession
Expand Down