Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,6 @@ public class OBKVHbaseConfig extends Config {
.checkValue(size -> size > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
.createWithDefault(10_000);

public static final ConfigEntry<String> SCHEMA =
new ConfigBuilder("schema")
.doc("The schema of the obkv-hbase table")
.version(ConfigConstants.VERSION_1_0_0)
.stringConf()
.checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG)
.create();

public String getURL() {
return get(URL);
}
Expand Down Expand Up @@ -173,10 +165,6 @@ public Integer getBatchSize() {
return get(BATCH_SIZE);
}

public String getSchema() {
return get(SCHEMA);
}

public OBKVHbaseConfig(Map<String, String> properties) {
super();
loadFromMap(properties, k -> true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,17 @@
*/
package com.oceanbase.spark

import com.oceanbase.spark.HBaseRelation.{convertToBytes, parseCatalog}
import com.oceanbase.spark.HBaseRelation.convertToBytes
import com.oceanbase.spark.config.OBKVHbaseConfig
import com.oceanbase.spark.obkv.HTableClientUtils

import com.fasterxml.jackson.core.JsonParser.Feature
import org.apache.hadoop.hbase.client.{Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan}
import org.apache.spark.sql.types.{StructField, StructType}
import org.json4s.JsonAST.JObject
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._

import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}
Expand All @@ -50,9 +45,30 @@ case class HBaseRelation(

import scala.collection.JavaConverters._
private val config = new OBKVHbaseConfig(parameters.asJava)
private val userSchema: StructType = parseCatalog(config.getSchema)

override def schema: StructType = userSpecifiedSchema.getOrElse(userSchema)
// Validate and extract schema metadata
require(userSpecifiedSchema.isDefined, "Schema must be specified")
private val providedSchema = userSpecifiedSchema.get

// Validate schema structure
require(providedSchema.fields.nonEmpty, "Schema must have at least one field")

// First field is rowkey (can be any atomic type)
private val rowkeyField = providedSchema.fields.head
require(
!rowkeyField.dataType.isInstanceOf[StructType],
s"First field '${rowkeyField.name}' must be an atomic type (rowkey), not STRUCT")

// Other fields must be STRUCT types (column families)
private val columnFamilyFields = providedSchema.fields.tail
columnFamilyFields.foreach {
field =>
require(
field.dataType.isInstanceOf[StructType],
s"Field '${field.name}' must be a STRUCT type representing a column family")
}

override def schema: StructType = providedSchema

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
throw new NotImplementedError("Not supports reading obkv-hbase")
Expand All @@ -79,27 +95,36 @@ case class HBaseRelation(
// Group puts by column family to handle multiple families correctly
val familyPutListMap = mutable.HashMap.empty[String, util.ArrayList[Put]]

buffer.foreach(
row => {
// Get field index by col name that defined in catalog
val rowKeyIndex = row.schema.fieldIndex(HBaseRelation.rowKey)
val rowKey: Array[Byte] = convertToBytes(row(rowKeyIndex))
buffer.foreach {
row =>
// First field is rowkey
val rowKey: Array[Byte] = convertToBytes(row.get(0))

// Group columns by family
val columnsByFamily = new mutable.HashMap[String, ArrayBuffer[(String, Array[Byte])]]()

// Mapping DataFrame's schema to User-defined schema
for (i <- 0 until (row.size)) {
if (i != rowKeyIndex) {
val rowFieldName = row.schema.fieldNames(i)
// Only write columns defined by the user in the schema.
if (HBaseRelation.columnFamilyMap.contains(rowFieldName)) {
val userFieldName = HBaseRelation.columnFamilyMap(rowFieldName)._1
val cfName = HBaseRelation.columnFamilyMap(rowFieldName)._2
val columnValue = convertToBytes(row.get(i))

columnsByFamily.getOrElseUpdate(cfName, ArrayBuffer.empty) +=
((userFieldName, columnValue))
// Process each column family (STRUCT fields starting from index 1)
for (i <- 1 until row.size) {
val cfFieldIndex = i - 1 // Index in columnFamilyFields array
val cfField = columnFamilyFields(cfFieldIndex)
val cfName = cfField.name
val cfStruct = cfField.dataType.asInstanceOf[StructType]

// Get the STRUCT value for this column family
val cfRow = row.getStruct(i)

if (cfRow != null) {
// Process each column qualifier in this family
for (j <- 0 until cfStruct.fields.length) {
val qualifierField = cfStruct.fields(j)
val qualifierName = qualifierField.name
val qualifierValue = cfRow.get(j)

if (qualifierValue != null) {
val columnValue = convertToBytes(qualifierValue)
columnsByFamily.getOrElseUpdate(cfName, ArrayBuffer.empty) +=
((qualifierName, columnValue))
}
}
}
}
Expand All @@ -116,69 +141,21 @@ case class HBaseRelation(

familyPutListMap.getOrElseUpdate(cfName, new util.ArrayList[Put]()).add(put)
}
})
}

// Flush each family's puts separately
familyPutListMap.values.foreach(
putList => {
familyPutListMap.values.foreach {
putList =>
if (!putList.isEmpty) {
hTableClient.put(putList)
}
})
}

buffer.clear()
}
}

object HBaseRelation {
private val CF = "cf"
private val COLUMN_NAME = "col"
private val COLUMN_TYPE = "type"

private var rowKey = ""

/**
* Mapping DataFrame's schema to User-defined schema:
*
* <p> "address": {"cf": "family1","col": "officeAddress","type": "string"}
*
* <p> [address, (officeAddress, family1)]
*
* <p> [user define obkv-hbase col name, (dataframe schema col name, family name)]
*/
private val columnFamilyMap: mutable.Map[String, (String, String)] =
mutable.LinkedHashMap.empty[String, (String, String)]

def parseCatalog(catalogJson: String): StructType = {
JsonMethods.mapper.configure(Feature.ALLOW_SINGLE_QUOTES, true)
val jObject: JObject = parse(catalogJson).asInstanceOf[JObject]
val schemaMap = mutable.LinkedHashMap.empty[String, Field]
getColsPreservingOrder(jObject).foreach {
case (name, column) =>
if (column(CF).equalsIgnoreCase("rowKey"))
rowKey = column(COLUMN_NAME)
else
columnFamilyMap.put(column(COLUMN_NAME), (name, column(CF)))

val filed = Field(column(CF), column(COLUMN_NAME), column(COLUMN_TYPE))
schemaMap.+=((name, filed))
}

val fields: Seq[StructField] = schemaMap.map {
case (name, field) =>
StructField(name, CatalystSqlParser.parseDataType(field.columnType))
}.toSeq

StructType(fields)
}

private def getColsPreservingOrder(jObj: JObject): Seq[(String, Map[String, String])] = {
jObj.obj.map {
case (name, jValue) =>
(name, jValue.values.asInstanceOf[Map[String, String]])
}
}

def convertToBytes(data: Any): Array[Byte] = data match {
case null => null
case _: Boolean => Bytes.toBytes(data.asInstanceOf[Boolean])
Expand All @@ -199,5 +176,3 @@ object HBaseRelation {
throw new UnsupportedOperationException(s"Unsupported type: ${data.getClass.getSimpleName}")
}
}

case class Field(cf: String, columnName: String, columnType: String)
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,24 @@ import org.apache.spark.sql.types._
class OBKVHBaseSparkSource
extends DataSourceRegister
with RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider {

override def shortName(): String = "obkv-hbase"

override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
HBaseRelation(parameters, None)(sqlContext)
throw new UnsupportedOperationException(
"OBKV-HBase connector requires a user-specified schema. " +
"Please use CREATE TEMPORARY VIEW with schema definition.")
}

override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
HBaseRelation(parameters, Some(schema))(sqlContext)
}

override def createRelation(
Expand Down
Loading
Loading