SparkSQL + HBase数据拉取

NewApiHadoop

val context: SparkContext = sparkSession.sparkContext
// set inputformat
conf.set(TableInputFormat.INPUT_TABLE, Constants.HTAB_ORDER)
val scan = new Scan
// todo scan 过滤条件
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rddResult: RDD[(ImmutableBytesWritable, Result)] = 
                                        context.newAPIHadoopRDD(
                      conf, // hbase配置文件
                      classOf[TableInputFormat],
                      classOf[ImmutableBytesWritable],
                      classOf[Result])
import sparkSession.implicits._
// result to rdd
val orderRDD: RDD[Order] = rddResult.mapPartitions(eachPartition => {
  val orders: Iterator[Order] = eachPartition.map(eachResult => {
    val value: Result = eachResult._2
    Order(order_id, city_id, start_time, end_time)
  })
  orders
})
orderRDD.toDF

def convertScanToString(scan: Scan):String = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray)
}

DataSource V2

  • DataSource V1和V2的区别可以参考 文章
DataSource
/**
     * 自定义spark sql数据源
  * 1.继承DataSourceV2向Spark注册数据源
  * 2.继承ReadSupport支持读数据
 *  {@link ReadSupport }支持读取操作
 */
class HBaseCustomSource extends DataSourceV2 with ReadSupport{
  // 自定义的DataSourceReader
  override def createReader(options: DataSourceOptions): DataSourceReader = {
      new HBaseCustomDataSourceReader(hbaseTableName,sparkSqlTableSchema,hbaseTableSchema)
  }
}
DataSourceReader
/**
 * 自定义的DataSourceReader
 * 继承DataSourceReader
 * 重写readSchema方法用来生成schema
 * 重写createDataReaderFactories,多分区支持。用来根据条件,创建多个工厂实例
 *
 * 注意:
 *  {@link SupportsPushDownFilters } 过滤条件
 *  {@link SupportsPushDownRequiredColumns }裁剪字段
 * 1. 下推条件必须满足数据类型 比如:string 不能判断大小
 * 2. short and bytes 比较大小必须使用显示转换 where id case (见spark ParquetFilter源码)
 */
class HBaseCustomDataSourceReader extends DataSourceReader 
                with SupportsPushDownFilters 
                with SupportsPushDownRequiredColumns{

  var supportsFilters = Array.empty[Filter]
  var requiredSchema:StructType = null

  // 过滤条件
  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
    val supported = ListBuffer.empty[Filter]
    val unsupported = ListBuffer.empty[Filter]

    /**
     * 仅仅把等于大于大于等于,小于小于等于下推
     */
    filters.foreach{
      case filter: EqualTo => supported +=filter
      case filter: GreaterThan=> supported +=filter
      case filter: GreaterThanOrEqual=> supported +=filter
      case filter: LessThan => supported +=filter
      case filter: LessThanOrEqual => supported +=filter
      case filter => unsupported +=filter
    }
    this.supportsFilters = supported.toArray[Filter]
    unsupported.toArray
  }

  // 下推支持的过滤条件
  override def pushedFilters(): Array[Filter] = supportsFilters.toArray

   // 获取select的列
  override def pruneColumns(requiredSchema: StructType): Unit = {
    this.requiredSchema = requiredSchema
  }

  // 生成schema
  override def readSchema(): StructType = {
    if(requiredSchema != null){
      return requiredSchema
    }
    StructType.fromDDL(sparkSqlTableSchema)
  }

  // 多分区支持
  override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = {
    import scala.collection.JavaConverters._
    Seq(
      new HBaseDataReaderFactory(...).asInstanceOf[DataReaderFactory[Row]]
    ).asJava
  }
}
DataReaderFactory
/**
 * 创建DataReader工厂类
 */
class HBaseDataReaderFactory extends DataReaderFactory[Row]{
  override def createDataReader(): DataReader[Row] = {
    new HBaseCustomDataReader()
  }
}
DataReader
class HBaseCustomDataReaderextends DataReader[Row]{
    // hbase连接
  var hbaseConnection : Connection = null
  // 读取到的数据
  val datas:Iterator[Result] = getIterator

  // load hbase data
  def getIterator: Iterator[Result] = {
    table = hbaseConnection.getTable(TableName.valueOf(hbaseTableName.trim))
    val scan: Scan = new Scan
    // 填充DataSourceReader下推的过滤条件,和列裁剪
    fullScanByColumns(scan,requiredSchemaList)
    fullScanByFilter(scan)
    // 查询数据并解析
    scanner = table.getScanner(scan)
    import scala.collection.JavaConverters._
    scanner.iterator.asScala
  }
}
fullScanByColumns
 // 例如:拼接查询所需要的列
def fullScanByColumns(scan:Scan)={
    hbaseTableSelectedFamilyAndColumn.map(tuple=>{
        scan.addColumn(tuple._1getBytes,tuple._2.getBytes)
        })
}
fullScanByFilters
//例如:拼接所需要的filter supportFilters
def fullScanByFilter(scan: Scan) = {
  val filterList = new FilterList()
  supportsFilters.foreach{
    case filter: EqualTo => {
      filterList.addFilter()
    }
    // todo 
  }
  if(filterList.getFilters.size() > 0){
    scan.setFilter(filterList)
  }
}

DataWriter
// data writer支持事物的读写
public interface DataSourceWriter {
  void commit(WriterCommitMessage[] messages);
  void abort(WriterCommitMessage[] messages);
}

public interface DataWriter<T> {
  void write(T record) throws IOException;
  WriterCommitMessage commit() throws IOException;
  void abort() throws IOException;
}
使用
 // 数据源1
 Dataset<Row> load = spark.read()
   .format("top.kfly.hbase.source.HBaseCustomSource")
   .option(Constants.HBASE_TABlE_NAME, "flink:kfly_orders")
   .option(Constants.HBASE_TABLE_SCHEMA, " f1:userId,f1:goodsMoney,f1:orderNo,f1:goodId")
   .option(Constants.SPARK_SQL_TABlE_SCHEMA, "userId String,goodsMoney String,orderNo String,goodId int")
   .load()
   .select("goodsMoney","orderNo","goodId")
   .filter("goodId <= 16");

注意

当字段类型出现short / byte时,将不会被下推。需要显性的转换为smallint,如下

select * from t where id = cast(2 as smallint)