HBase Coprocessor的实现与应用

原文:叶铿 烽火大数据平台 研发负责人 转发

Coprocessor简介

  • HBase协处理器的灵感来自于Jeff Dean 09年的演讲,根据该演讲实现类似于Bigtable的协处理器,包括以下特性:每个表服务器的任意子表都可以运行代码客户端的高层调用接口(客户端能够直接访问数据表的行地址,多行读写会自动分片成多个并行的RPC调用),提供一个非常灵活的、可用于建立分布式服务的数据模型,能够自动化扩展、负载均衡、应用请求路由。HBase的协处理器灵感来自Bigtable,但是实现细节不尽相同。HBase建立框架为用户提供类库和运行时环境,使得代码能够在HBase Region Server和Master上面进行处理。

实现目的

  • 1.HBase无法轻易建立“二级索引”;
  • 2.执行求和、计数、排序等操作比较困难,必须通过MapReduce/Spark实现,对于简单的统计或聚合计算,可能会因为网络与IO开销大而带来性能问题。

灵感来源

  • 灵感来源于Bigtable的协处理器,包含如下特性:
    • 1.每个表服务器的任意子表都可以运行代码;
    • 2.客户端能够直接访问数据表的行,多行读写会自动分片成多个并行的RPC调用。

提供接口

  • 1.RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等
  • 2.WALObserver:提供WAL相关操作钩子;
  • 3.MasterObserver:提供DDL-类型的操作钩子。如创建、删除、修改数据表等;
  • 4.Endpoint:终端是动态RPC插件的接口,它的实现代码被安装在服务器端,能够通过HBase RPC调用唤醒。

应用范围

  • Apache HBase实战技术总结–中国HBase技术社区
    • 1.通过使用RegionObserver接口可以实现二级索引的创建和维护;
    • 2.通过使用Endpoint接口,在对数据进行简单排序和sum,count等统计操作时,能够极大提高性能。

本文将通过具体实例来演示两种协处理器的开发方法的详细实现过程。

EndPoint实现

  • 在传统关系型数据库里面,可以随时的对某列进行求和sum,但是目前HBase目前所提供的接口,直接求和是比较困难的,所以先编写好服务端代码,并加载到对应的Table上,加载协处理器有几种方法,可以通过HTableDescriptor的add Coprocessor方法直接加载,同理也可以通过removeCoprocessor方法卸载协处理器。
  • Endpoint协处理器类似传统数据库的存储过程,客户端调用Endpoint协处理器执行一段Server端代码,并将Server端代码的结果返回给Client进一步处理,最常见的用法就是进行聚合操作。举个例子说明:如果没有协处理器,当用户需要找出一张表中的最大数据即max聚合操作,必须进行全表扫描,客户端代码遍历扫描结果并执行求max操作,这样的方法无法利用底层集群的并发能力,而将所有计算都集中到Client端统一执行,效率非常低。但是使用Coprocessor,用户将求max的代码部署到HBase Server端,HBase将利用底层Cluster的多个节点并行执行求max的操作即在每个Region范围内执行求最大值逻辑,将每个Region的最大值在Region Server端计算出,仅仅将该max值返回给客户端。客户端进一步将多个Region的max进一步处理而找到其中的max,这样整体执行效率提高很多。但是一定要注意的是Coprocessor一定要写正确,否则导致RegionServer宕机

HBase-EndPoint

Protobuf定义

  • 如前所述,客户端和服务端之间需要进行RPC通信,所以两者间需要确定接口,当前版本的HBase的协处理器是通过Google Protobuf协议来实现数据交换的,所以需要通过Protobuf来定义接口。如下所示:
option java_package = "com.my.hbase.protobuf.generated";
option java_outer_classname = "AggregateProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "Client.proto";

message AggregateRequest {
    required string interpreter_class_name = 1;
    required Scan scan = 2;
    optional bytes  interpreter_specific_bytes = 3;
}

message AggregateResponse {
    repeated bytes first_part = 1;
    optional bytes second_part = 2;
}

service AggregateService {
    rpc GetMax (AggregateRequest) returns (AggregateResponse);
    rpc GetMin (AggregateRequest) returns (AggregateResponse);
    rpc GetSum (AggregateRequest) returns (AggregateResponse);
    rpc GetRowNum (AggregateRequest) returns (AggregateResponse);
    rpc GetAvg (AggregateRequest) returns (AggregateResponse);
    rpc GetStd (AggregateRequest) returns (AggregateResponse);
    rpc GetMedian (AggregateRequest) returns (AggregateResponse);
}
  • 可以看到这里定义7个聚合服务RPC,名字分别叫做GetMax、GetMin、GetSum等,本文通过GetSum进行举例,其他的聚合RPC也是类似的内部实现。RPC有一个入口参数,用消息AggregateRequest表示;RPC的返回值用消息AggregateResponse表示。Service是一个抽象概念,RPC的Server端可以看作一个用来提供服务的Service。在HBaseCoprocessor中Service就是Server端需要提供的EndpointCoprocessor服务,主要用来给HBase的Client提供服务。AggregateService.java是由Protobuf软件通过终端命令protoc filename.proto --java_out=OUT_DIR自动生成的,其作用是将.proto文件定义的消息结构以及服务转换成对应接口的RPC实现,其中包括如何构建request消息和response响应以及消息包含的内容的处理方式,并且将AggregateService包装成一个抽象类,具体的服务以类的方法的形式提供。AggregateService.java定义Client端与Server端通信的协议,代码中包含请求信息结构AggregateRequest、响应信息结构AggregateResponse、 提 供 的 服 务 种 类AggregateService,其中AggregateRequest中的interpreter_class_name指的是column interpreter的类名,此类的作用在于将数据格式从存储类型解析成所需类型。AggregateService.java由于代码太长,在这里就不贴出来了。

  • 下面我们来讲一下服务端的架构:

  • 首先,EndpointCoprocessor是一个Protobuf Service的实现,因此需要它必须继承某个ProtobufService。我们在前面已经通过proto文件定义Service,命名为AggregateService,因此Server端代码需要重载该类,其次作为HBase的协处理器,Endpoint 还必须实现HBase定义的协处理器协议,用Java的接口来定义。具体来说就是CoprocessorService和Coprocessor,这些HBase接口负责将协处理器和HBase 的RegionServer等实例联系起来以便协同工作。Coprocessor接口定义两个接口函数:start和stop。

  • 加载Coprocessor之后Region打开的时候被RegionServer自动加载,并会调用器start 接口完成初始化工作。一般情况该接口函数仅仅需要将协处理器的运行上下文环境变量CoprocessorEnvironment保存到本地即可。

  • CoprocessorEnvironment保存协处理器的运行环境,每个协处理器都是在一个RegionServer进程内运行并隶属于某个Region。通过该变量获取Region的实例等HBase运行时环境对象。

  • Coprocessor接口还定义stop()接口函数,该函数在Region被关闭时调用,用来进行协处理器的清理工作。本文里我们没有进行任何清理工作,因此该函数什么也不干。我们的协处理器还需要实现CoprocessorService接口。该接口仅仅定义一个接口函数getService()。我们仅需要将本实例返回即可。

  • HBase的Region Server在接收到客户端的调用请求时,将调用该接口获取实现RPCService的实例,因此本函数一般情况下就是返回自身实例即可。完成以上三个接口函数之后,Endpoint的框架代码就已完成。每个Endpoint协处理器都必须实现这些框架代码而且写法雷同。

    Endpoint Service端实现

  • Server端的代码就是一个Protobuf RPC的Service实现,即通过Protobuf提供的某种服务。其开发内容主要包括

    • 实现Coprocessor的基本框架代码
    • 实现服务的RPC具体代码

Endpoint 协处理的基本框架

  • Endpoint 是一个Server端Service的具体实现,其实现有一些框架代码,这些框架代码与具体的业务需求逻辑无关。仅仅是为了和HBase运行时环境协同工作而必须遵循和完成的一些粘合代码。因此多数情况下仅仅需要从一个例子程序拷贝过来并进行命名修改即可。不过我们还是完整地对这些粘合代码进行粗略的讲解以便更好地理解代码

未完待续