Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ case class HBaseRelation(

import scala.collection.JavaConverters._
private val config = new OBKVHbaseConfig(parameters.asJava)
private val userSchema: StructType = parseCatalog(config.getSchema)

// Parse catalog once and store results as instance variables
private val (
userSchema: StructType,
rowKey: String,
columnFamilyMap: mutable.Map[String, (String, String)]) =
parseCatalog(config.getSchema)

override def schema: StructType = userSpecifiedSchema.getOrElse(userSchema)

Expand Down Expand Up @@ -81,9 +87,9 @@ case class HBaseRelation(

buffer.foreach(
row => {
// Get field index by col name that defined in catalog
val rowKeyIndex = row.schema.fieldIndex(HBaseRelation.rowKey)
val rowKey: Array[Byte] = convertToBytes(row(rowKeyIndex))
// Get field index by col name that defined in catalog (using instance variable now)
val rowKeyIndex = row.schema.fieldIndex(rowKey)
val rowKeyBytes: Array[Byte] = convertToBytes(row(rowKeyIndex))

// Group columns by family
val columnsByFamily = new mutable.HashMap[String, ArrayBuffer[(String, Array[Byte])]]()
Expand All @@ -92,10 +98,10 @@ case class HBaseRelation(
for (i <- 0 until (row.size)) {
if (i != rowKeyIndex) {
val rowFieldName = row.schema.fieldNames(i)
// Only write columns defined by the user in the schema.
if (HBaseRelation.columnFamilyMap.contains(rowFieldName)) {
val userFieldName = HBaseRelation.columnFamilyMap(rowFieldName)._1
val cfName = HBaseRelation.columnFamilyMap(rowFieldName)._2
// Only write columns defined by the user in the schema (using instance variable now)
if (columnFamilyMap.contains(rowFieldName)) {
val userFieldName = columnFamilyMap(rowFieldName)._1
val cfName = columnFamilyMap(rowFieldName)._2
val columnValue = convertToBytes(row.get(i))

columnsByFamily.getOrElseUpdate(cfName, ArrayBuffer.empty) +=
Expand All @@ -107,7 +113,7 @@ case class HBaseRelation(
// Create one Put per family with the same rowKey
columnsByFamily.foreach {
case (cfName, columns) =>
val put: Put = new Put(rowKey)
val put: Put = new Put(rowKeyBytes)
val familyName: Array[Byte] = Bytes.toBytes(cfName)
columns.foreach {
case (colName, colValue) =>
Expand Down Expand Up @@ -135,24 +141,26 @@ object HBaseRelation {
private val COLUMN_NAME = "col"
private val COLUMN_TYPE = "type"

private var rowKey = ""

/**
* Mapping DataFrame's schema to User-defined schema:
*
* <p> "address": {"cf": "family1","col": "officeAddress","type": "string"}
*
* <p> [address, (officeAddress, family1)]
* Parse catalog JSON and return schema along with metadata. Returns a tuple of (StructType,
* rowKey, columnFamilyMap) to ensure proper serialization.
*
* <p> [user define obkv-hbase col name, (dataframe schema col name, family name)]
* @param catalogJson
* JSON string defining the catalog schema
* @return
* Tuple of (StructType schema, String rowKey, Map columnFamilyMap)
*/
private val columnFamilyMap: mutable.Map[String, (String, String)] =
mutable.LinkedHashMap.empty[String, (String, String)]

def parseCatalog(catalogJson: String): StructType = {
def parseCatalog(
catalogJson: String): (StructType, String, mutable.Map[String, (String, String)]) = {
JsonMethods.mapper.configure(Feature.ALLOW_SINGLE_QUOTES, true)
val jObject: JObject = parse(catalogJson).asInstanceOf[JObject]
val schemaMap = mutable.LinkedHashMap.empty[String, Field]

// Local variables instead of static ones
var rowKey = ""
val columnFamilyMap: mutable.Map[String, (String, String)] =
mutable.LinkedHashMap.empty[String, (String, String)]

getColsPreservingOrder(jObject).foreach {
case (name, column) =>
if (column(CF).equalsIgnoreCase("rowKey"))
Expand All @@ -169,7 +177,7 @@ object HBaseRelation {
StructField(name, CatalystSqlParser.parseDataType(field.columnType))
}.toSeq

StructType(fields)
(StructType(fields), rowKey, columnFamilyMap)
}

private def getColsPreservingOrder(jObj: JObject): Seq[(String, Map[String, String])] = {
Expand Down