博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Sqoop导入HBase,并借助Coprocessor协处理器同步索引到ES
阅读量:5117 次
发布时间:2019-06-13

本文共 9800 字,大约阅读时间需要 32 分钟。

1.环境

  • Mysql 5.6
  • Sqoop 1.4.6
  • Hadoop 2.5.2
  • HBase 0.98
  • Elasticsearch 2.3.5

2.安装(略过)

3.HBase Coprocessor实现

HBase Observer

import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.CoprocessorEnvironment;import org.apache.hadoop.hbase.client.Delete;import org.apache.hadoop.hbase.client.Durability;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;import org.apache.hadoop.hbase.coprocessor.ObserverContext;import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;import org.apache.hadoop.hbase.regionserver.wal.WALEdit;import org.apache.hadoop.hbase.util.Bytes;import org.elasticsearch.client.Client;//import org.elasticsearch.client.transport.TransportClient;//import org.elasticsearch.common.settings.ImmutableSettings;//import org.elasticsearch.common.settings.Settings;//import org.elasticsearch.common.transport.InetSocketTransportAddress;import java.io.IOException;import java.util.HashMap;import java.util.List;import java.util.Map;//import java.util.NavigableMap;public class DataSyncObserver extends BaseRegionObserver {   private static Client client = null;   private static final Log LOG = LogFactory.getLog(DataSyncObserver.class);   /**    * 读取HBase Shell的指令参数    *    * @param env    */   private void readConfiguration(CoprocessorEnvironment env) {       Configuration conf = env.getConfiguration();       Config.clusterName = conf.get("es_cluster");       Config.nodeHost = conf.get("es_host");       Config.nodePort = conf.getInt("es_port", -1);       Config.indexName = conf.get("es_index");       Config.typeName = conf.get("es_type");       LOG.info("observer -- started with config: " + Config.getInfo());   }   @Override   public void start(CoprocessorEnvironment env) throws IOException {       readConfiguration(env);//        Settings settings = ImmutableSettings.settingsBuilder()//                .put("cluster.name", Config.clusterName).build();//        client = new TransportClient(settings)//                .addTransportAddress(new InetSocketTransportAddress(//                        Config.nodeHost, Config.nodePort));       client = MyTransportClient.client;   }   @Override   public void postPut(ObserverContext
e, Put put, WALEdit edit, Durability durability) throws IOException { try { String indexId = new String(put.getRow()); Map
> familyMap = put.getFamilyCellMap();// NavigableMap
> familyMap = put.getFamilyCellMap(); Map
json = new HashMap
(); for (Map.Entry
> entry : familyMap.entrySet()) { for (Cell cell : entry.getValue()) { String key = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); json.put(key, value); } } System.out.println(); ElasticSearchOperator.addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, indexId).setDoc(json).setUpsert(json)); LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName); } catch (Exception ex) { LOG.error(ex); } } @Override public void postDelete(final ObserverContext
e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException { try { String indexId = new String(delete.getRow()); ElasticSearchOperator.addDeleteBuilderToBulk(client.prepareDelete(Config.indexName, Config.typeName, indexId)); LOG.info("observer -- delete a doc: " + indexId); } catch (Exception ex) { LOG.error(ex); } }}

ES方法

import org.elasticsearch.action.bulk.BulkRequestBuilder;import org.elasticsearch.action.bulk.BulkResponse;import org.elasticsearch.action.delete.DeleteRequestBuilder;import org.elasticsearch.action.update.UpdateRequestBuilder;import org.elasticsearch.client.Client;//import org.elasticsearch.client.transport.TransportClient;//import org.elasticsearch.common.settings.ImmutableSettings;//import org.elasticsearch.common.settings.Settings;//import org.elasticsearch.common.transport.InetSocketTransportAddress;import java.util.HashMap;import java.util.Map;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ElasticSearchOperator {   // 缓冲池容量   private static final int MAX_BULK_COUNT = 10;   // 最大提交间隔(秒)   private static final int MAX_COMMIT_INTERVAL = 60 * 5;   private static Client client = null;   private static BulkRequestBuilder bulkRequestBuilder = null;   private static Lock commitLock = new ReentrantLock();   static {       // elasticsearch1.5.0//        Settings settings = ImmutableSettings.settingsBuilder()//                .put("cluster.name", Config.clusterName).build();//        client = new TransportClient(settings)//                .addTransportAddress(new InetSocketTransportAddress(//                        Config.nodeHost, Config.nodePort));       // 2.3.5       client = MyTransportClient.client;       bulkRequestBuilder = client.prepareBulk();       bulkRequestBuilder.setRefresh(true);       Timer timer = new Timer();       timer.schedule(new CommitTimer(), 10 * 1000, MAX_COMMIT_INTERVAL * 1000);   }   /**    * 判断缓存池是否已满,批量提交    *    * @param threshold    */   private static void bulkRequest(int threshold) {       if (bulkRequestBuilder.numberOfActions() > threshold) {           BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();           if (!bulkResponse.hasFailures()) {               bulkRequestBuilder = client.prepareBulk();           }       }   }   /**    * 加入索引请求到缓冲池    *    * @param builder    */   public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {       commitLock.lock();       try {           bulkRequestBuilder.add(builder);           bulkRequest(MAX_BULK_COUNT);       } catch (Exception ex) {           ex.printStackTrace();       } finally {           commitLock.unlock();       }   }   /**    * 加入删除请求到缓冲池    *    * @param builder    */   public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {       commitLock.lock();       try {           bulkRequestBuilder.add(builder);           bulkRequest(MAX_BULK_COUNT);       } catch (Exception ex) {           ex.printStackTrace();       } finally {           commitLock.unlock();       }   }   /**    * 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步    */   static class CommitTimer extends TimerTask {       @Override       public void run() {           commitLock.lock();           try {               bulkRequest(0);           } catch (Exception ex) {               ex.printStackTrace();           } finally {               commitLock.unlock();           }       }   }}

打包并上传到hdfs

mvn clean compile assembly:singlemv observer-1.0-SNAPSHOT-jar-with-dependencies.jar observer-hb0.98-es2.3.5.jarhdfs dfs -put observer-hb0.98-es2.3.5.jar /hbase/lib/

4.创建HBase表,并启用Coprocessor

mysql

hbase shellcreate 'region','data'disable 'region'alter 'region', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=mysql_region,es_index=hbase,es_port=9300,es_host=localhost'enable 'region'

oracle

create 'sp','data'disable 'sp'alter 'sp', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=oracle_sp,es_index=hbase,es_port=9300,es_host=localhost'enable 'sp'

查看

hbase(main):007:0* describe 'ora_test'
Table ora_test is ENABLED                                            ora_test, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs:///appdt/hbase/lib/observer-hb1.2.2-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=ora_test,es_index=hbase,es_port=9300,es_host=localhost'}                                               COLUMN FAMILIES DESCRIPTION                                          {NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}     1 row(s) in 0.0260 seconds

删除Coprocessor

disable 'ora_test' alter 'ora_test',METHOD => 'table_att_unset',NAME =>'coprocessor$1' enable 'ora_test'

查看删除效果

hbase(main):011:0> describe 'ora_test'
Table ora_test is ENABLED                                           ora_test                                                             COLUMN FAMILIES DESCRIPTION                                          {NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}     1 row(s) in 0.0200 seconds

5.使用sqoop上传数据

mysql

bin/sqoop import --connect jdbc:mysql://192.168.1.187:3306/trade_dev --username mysql --password 111111 --table TB_REGION --hbase-table region --hbase-row-key REGION_ID --column-family data

oracle

bin/sqoop import --connect jdbc:oracle:thin:@192.168.16.223:1521/orcl --username sitts --password password --table SITTS.ESB_SERVICE_PARAM --split-by PARAM_ID --hbase-table sp --hbase-row-key PARAM_ID --column-family data

6.校验

HBase

scan 'region'

ES

7.参考

8.注意

  • 同一个Coprocessor用一个index,不同表可以设置不同type,不然index会乱
  • 修改Java代码后,上传到HDFS的jar包文件必须和之前不一样,否则就算卸载掉原有的coprocessor再重新安装也不能生效
  • 如果你有多个表对多个索引/类型的映射,每个表所加载Observer对应的jar包路径不能相同,否则ElasticSearch会串数据

转载于:https://www.cnblogs.com/itboys/p/9520389.html

你可能感兴趣的文章
Zookeeper选举算法原理
查看>>
嵌入式成长轨迹52 【Zigbee项目】【CC2430基础实验】【在PC用串口收数并发数】...
查看>>
函数随笔
查看>>
哈尔滨工程大学ACM预热赛(A,C,H,I)
查看>>
3月29日AM
查看>>
利用IP地址查询接口来查询IP归属地
查看>>
HTML元素定义 ID,Class,Style的优先级
查看>>
【实数二分/前缀和维护】Best Cow Fences
查看>>
构造者模式
查看>>
JavaScript:学习笔记(3)——正则表达式的应用
查看>>
浮点数转化为字符串
查看>>
ssRs父子维度
查看>>
关押罪犯
查看>>
像房源上下架链路比较长的需求怎么测试?测试的重点和难点?
查看>>
http和https的区别
查看>>
Hbuild在线云ios打包失败,提示BuildConfigure Failed 31013 App Store 图标 未找到 解决方法...
查看>>
找到树中指定id的所有父节点
查看>>
今天新开通了博客
查看>>
Linux命令应用大词典-第4章 目录和文件操作
查看>>
A + B Problem II
查看>>