HBase协处理器同步二级索引到Solr(续)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了HBase协处理器同步二级索引到Solr(续),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含11330字,纯文字阅读大概需要17分钟。
内容图文
二、解决思路
三、代码
3.1 读取config文件内容
3.2 封装SolrServer的获取方式
3.3 编写提交数据到Solr的代码
3.4 拦截HBase的Put和Delete操作信息
四、 使用
一、 已知的问题和不足
在上一个版本中,实现了使用HBase的协处理器将HBase的二级索引同步到Solr中,但是仍旧有几个缺陷:
- 写入Solr的Collection是写死在代码里面,且是唯一的。如果我们有一张表的数据希望将不同的字段同步到Solr中该如何做呢?
- 目前所有配置相关信息都是写死到了代码中的,是否可以添加外部配置文件。
- 原来的方法是每次都需要编译新的Jar文件单独运行,能否将所有的同步使用一段通用的代码完成?
二、解决思路
针对上面的三个主要问题,我们一一解决
- 通常一张表会对应多个SolrCollection以及不同的Column。我们可以使用
Map[表名->List[(Collection1,List[Columns]),(Collection2,List[Columns])...]]
这样的类型,根据表名获取所有的Collection和Column。 - 通过Typesafe Config读取外部配置文件,达到所有信息可配的目的。
- 所有的数据都只有Put和Delete,只要我们拦截到具体的消息之后判断当前的表名,然后根据问题一中的Collection和Column即可写入对应的SolrServer。在协处理器中获取表名的是
e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString()
其中e是ObserverContext
三、代码
3.1 读取config文件内容
使用typesafe的config组件读取morphlines.conf文件,将内容转换为 Map<String,List<HBaseIndexerMappin>>
。具体代码如下
public class ConfigManager {
private static SourceConfig sourceConfig = new SourceConfig ();
public static Config config ;
static {
sourceConfig . setConfigFiles ( "morphlines.conf" );
config = sourceConfig . getConfig ();
}
public static Map < String , List < HBaseIndexerMappin >> getHBaseIndexerMappin (){
Map < String , List < HBaseIndexerMappin >> mappin = new HashMap < String , List < HBaseIndexerMappin >>();
Config mappinConf = config . getConfig ( "Mappin" );
List < String > tables = mappinConf . getStringList ( "HBaseTables" );
for ( String table : tables ){
List < Config > confList = ( List < Config >) mappinConf . getConfigList ( table );
List < HBaseIndexerMappin > maps = new LinkedList < HBaseIndexerMappin >();
for ( Config tmp : confList ){
HBaseIndexerMappin map = new HBaseIndexerMappin ();
map . solrConnetion = tmp . getString ( "SolrCollection" );
map . columns = tmp . getStringList ( "Columns" );
maps . add ( map );
}
mappin . put ( table , maps );
}
return mappin ;
}
}
3.2 封装SolrServer的获取方式
因为目前我使用的环境是Solr和HBase公用的同一套Zookeeper,因此我们完全可以借助HBase的Zookeeper信息。HBase的协处理器是运行在HBase的环境中的,自然可以通过HBase的Configuration获取当前的Zookeeper节点和端口,然后轻松的获取到Solr的地址。
public class SolrServerManager implements LogManager {
static Configuration conf = HBaseConfiguration . create ();
public static String ZKHost = conf . get ( "hbase.zookeeper.quorum" , "bqdpm1,bqdpm2,bqdps2" );
public static String ZKPort = conf . get ( "hbase.zookeeper.property.clientPort" , "2181" );
public static String SolrUrl = ZKHost + ":" + ZKPort + "/" + "solr" ;
public static int zkClientTimeout = 1800000 ; // 心跳
public static int zkConnectTimeout = 1800000 ; // 连接时间
public static CloudSolrServer create ( String defaultCollection ){
log . info ( "Create SolrCloudeServer .This collection is " + defaultCollection );
CloudSolrServer solrServer = new CloudSolrServer ( SolrUrl );
solrServer . setDefaultCollection ( defaultCollection );
solrServer . setZkClientTimeout ( zkClientTimeout );
solrServer . setZkConnectTimeout ( zkConnectTimeout );
return solrServer ;
}
}
3.3 编写提交数据到Solr的代码
理想状态下,我们时时刻刻都需要提交数据到Solr中,但是事实上我们数据写入的时间是比较分散的,可能集中再每一天的某几个时间点。因此我们必须保证在高并发下能达到一定数据量自动提交,在低并发的情况下能隔一段时间写入一次。只有两种机制并存的情况下才能保证数据能即时写入。
public class SolrCommitTimer extends TimerTask implements LogManager {
public Map < String , List < SolrInputDocument >> putCache = new HashMap < String , List < SolrInputDocument >>(); //Collection名字->更新(插入)操作缓存
public Map < String , List < String >> deleteCache = new HashMap < String , List < String >>(); //Collection名字->删除操作缓存
Map < String , CloudSolrServer > solrServers = new HashMap < String , CloudSolrServer >(); //Collection名字->SolrServers
int maxCache = ConfigManager . config . getInt ( "MaxCommitSize" );
// 任何时候,保证只能有一个线程在提交索引,并清空集合
final static Semaphore semp = new Semaphore ( 1 );
//添加Collection和SolrServer
public void addCollecttion ( String collection , CloudSolrServer server ){
this . solrServers . put ( collection , server );
}
//往Solr添加(更新)数据
public UpdateResponse put ( CloudSolrServer server , SolrInputDocument doc ) throws IOException , SolrServerException {
server . add ( doc );
return server . commit ( false , false );
}
//往Solr添加(更新)数据
public UpdateResponse put ( CloudSolrServer server , List < SolrInputDocument > docs ) throws IOException , SolrServerException {
server . add ( docs );
return server . commit ( false , false );
}
//根据ID删除Solr数据
public UpdateResponse delete ( CloudSolrServer server , String rowkey ) throws IOException , SolrServerException {
server . deleteById ( rowkey );
return server . commit ( false , false );
}
//根据ID删除Solr数据
public UpdateResponse delete ( CloudSolrServer server , List < String > rowkeys ) throws IOException , SolrServerException {
server . deleteById ( rowkeys );
return server . commit ( false , false );
}
//将doc添加到缓存
public void addPutDocToCache ( String collection , SolrInputDocument doc ) throws IOException , SolrServerException , InterruptedException {
semp . acquire ();
log . debug ( "addPutDocToCache:" + "collection=" + collection + "data=" + doc . toString ());
if (! putCache . containsKey ( collection )){
List < SolrInputDocument > docs = new LinkedList < SolrInputDocument >();
docs . add ( doc );
putCache . put ( collection , docs );
} else {
List < SolrInputDocument > cache = putCache . get ( collection );
cache . add ( doc );
if ( cache . size () >= maxCache ) {
try {
this . put ( solrServers . get ( collection ), cache );
} finally {
putCache . get ( collection ). clear ();
}
}
}
semp . release (); //释放信号量
}
//添加删除操作到缓存
public void addDeleteIdCache ( String collection , String rowkey ) throws IOException , SolrServerException , InterruptedException {
semp . acquire ();
log . debug ( "addDeleteIdCache:" + "collection=" + collection + "rowkey=" + rowkey );
if (! deleteCache . containsKey ( collection )){
List < String > rowkeys = new LinkedList < String >();
rowkeys . add ( rowkey );
deleteCache . put ( collection , rowkeys );
} else {
List < String > cache = deleteCache . get ( collection );
cache . add ( rowkey );
if ( cache . size () >= maxCache ) {
try {
this . delete ( solrServers . get ( collection ), cache );
} finally {
putCache . get ( collection ). clear ();
}
}
}
semp . release (); //释放信号量
}
@Override
public void run () {
try {
semp . acquire ();
log . debug ( "开始插入...." );
Set < String > collections = solrServers . keySet ();
for ( String collection : collections ){
if ( putCache . containsKey ( collection ) && (! putCache . get ( collection ). isEmpty ()) ){
this . put ( solrServers . get ( collection ), putCache . get ( collection ));
putCache . get ( collection ). clear ();
}
if ( deleteCache . containsKey ( collection ) && (! deleteCache . get ( collection ). isEmpty ())){
this . delete ( solrServers . get ( collection ), deleteCache . get ( collection ));
deleteCache . get ( collection ). clear ();
}
}
} catch ( InterruptedException e ) {
e . printStackTrace ();
} catch ( Exception e ) {
log . error ( "Commit putCache to Solr error!Because :" + e . getMessage ());
} finally {
semp . release (); //释放信号量
}
}
}
3.4 拦截HBase的Put和Delete操作信息
在每个prePut和preDelete中拦截操作信息,记录表名、列名、值。将这些信息根据表名和Collection名进行分类写入缓存。
public class HBaseIndexerToSolrObserver extends BaseRegionObserver implements LogManager {
Map < String , List < HBaseIndexerMappin >> mappins = ConfigManager . getHBaseIndexerMappin ();
Timer timer = new Timer ();
int maxCommitTime = ConfigManager . config . getInt ( "MaxCommitTime" ); //最大提交时间,s
SolrCommitTimer solrCommit = new SolrCommitTimer ();
public HBaseIndexerToSolrObserver (){
log . info ( "Initialization HBaseIndexerToSolrObserver ..." );
for ( Map . Entry < String , List < HBaseIndexerMappin >> entry : mappins . entrySet () ){
List < HBaseIndexerMappin > solrmappin = entry . getValue ();
for ( HBaseIndexerMappin map : solrmappin ){
String collection = map . solrConnetion ; //获取Collection名字
log . info ( "Create Solr Server connection .The collection is " + collection );
CloudSolrServer solrserver = SolrServerManager . create ( collection ); //根据Collection初始化SolrServer连接
solrCommit . addCollecttion ( collection , solrserver );
}
}
timer . schedule ( solrCommit , 10 * 1000L , maxCommitTime * 1000L );
}
@Override
public void postPut ( ObserverContext < RegionCoprocessorEnvironment > e ,
Put put , WALEdit edit , Durability durability ) throws IOException {
String table = e . getEnvironment (). getRegion (). getTableDesc (). getTableName (). getNameAsString (); //获取表名
String rowkey = Bytes . toString ( put . getRow ()); //获取主键
SolrInputDocument doc = new SolrInputDocument ();
List < HBaseIndexerMappin > mappin = mappins . get ( table );
for ( HBaseIndexerMappin mapp : mappin ){
for ( String column : mapp . columns ){
String [] tmp = column . split ( ":" );
String cf = tmp [ 0 ];
String cq = tmp [ 1 ];
if ( put . has ( Bytes . toBytes ( cf ), Bytes . toBytes ( cq ))){
Cell cell = put . get ( Bytes . toBytes ( cf ), Bytes . toBytes ( cq )). get ( 0 ); //获取制定列的数据
Map < String , String > operation = new HashMap < String , String >();
operation . put ( "set" , Bytes . toString ( CellUtil . cloneValue ( cell )));
doc . setField ( cq , operation ); //使用原子更新的方式将HBase二级索引写入Solr
}
}
doc . addField ( "id" , rowkey );
try {
solrCommit . addPutDocToCache ( mapp . solrConnetion , doc ); //添加doc到缓存
} catch ( SolrServerException e1 ) {
e1 . printStackTrace ();
} catch ( InterruptedException e1 ) {
e1 . printStackTrace ();
}
}
}
@Override
public void postDelete ( ObserverContext < RegionCoprocessorEnvironment > e ,
Delete delete ,
WALEdit edit ,
Durability durability ) throws IOException {
String table = e . getEnvironment (). getRegion (). getTableDesc (). getTableName (). getNameAsString ();
String rowkey = Bytes . toString ( delete . getRow ());
List < HBaseIndexerMappin > mappin = mappins . get ( table );
for ( HBaseIndexerMappin mapp : mappin ){
try {
solrCommit . addDeleteIdCache ( mapp . solrConnetion , rowkey ); //添加删除操作到缓存
} catch ( SolrServerException e1 ) {
e1 . printStackTrace ();
} catch ( InterruptedException e1 ) {
e1 . printStackTrace ();
}
}
}
}
四、 使用
首先需要添加morphlines.conf文件。里面包含了需要同步数据到Solr的HBase表名、对应的Solr Collection的名字、要同步的列、多久提交一次、最大批次容量的相关信息。具体配置如下:
#最大提交时间(单位:秒)
MaxCommitTime = 30
#最大批次容量
MaxCommitSize = 10000
Mappin {
HBaseTables : [ "HBASE_OBSERVER_TEST" ] #需要同步的HBase表名
"HBASE_OBSERVER_TEST" : [
{
SolrCollection : "bqjr" #Solr Collection名字
Columns : [
"cf1:test_age" , #需要同步的列,格式<列族:列>
"cf1:test_name"
]
},
]
}
该配置文件默认放在各个节点的/etc/hbase/conf/
下。如果你希望将配置文件路径修改为其他路径,请修改com.bqjr.bigdata.HBaseObserver.comm.config.SourceConfig类中的configHome路径。
然后将代码打包,上传到HDFS中,将协处理器添加到对应的表中。
#先禁用这张表
disable ‘HBASE_OBSERVER_TEST‘
#为这张表添加协处理器,设置的参数具体为: jar文件路径|类名|优先级(SYSTEM或者USER)
alter ‘HBASE_OBSERVER_TEST‘ , ‘coprocessor‘ => ‘hdfs://hostname:8020/ext_lib/HBaseObserver-1.0.0.jar|com.bqjr.bigdata.HBaseObserver.server.HBaseIndexerToSolrObserver||‘
#启用这张表
enable ‘HBASE_OBSERVER_TEST‘
#删除某个协处理器,"$<bumber>"后面跟的ID号与desc里面的ID号相同
alter ‘HBASE_OBSERVER_TEST‘ , METHOD => ‘table_att_unset‘ , NAME => ‘coprocessor$1‘
如果需要新增一张表同步到Solr。只需要修改morphlines.conf文件,分发倒各个节点。然后将协处理器添加到HBase表中,这样就不用再次修改代码了。
原文:http://www.cnblogs.com/kekukekro/p/6478794.html
内容总结
以上是互联网集市为您收集整理的HBase协处理器同步二级索引到Solr(续)全部内容,希望文章能够帮你解决HBase协处理器同步二级索引到Solr(续)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。