龙空技术网

大数据开发|HBase API 详细例子(封装的DAO类)

无极低码 374

前言:

如今我们对“importnet”大概比较关注,你们都想要学习一些“importnet”的相关内容。那么小编也在网络上汇集了一些对于“importnet””的相关知识,希望姐妹们能喜欢,同学们一起来学习一下吧!

HBase中没有库的概念

HBase lib目录下所有JAR包复制到项目中,Hbase 版本0.98.5

package com.zxing.imgQRCode;

import java.io.IOException;

import java.util.LinkedList;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.MasterNotRunningException;

import org.apache.hadoop.hbase.ZooKeeperConnectionException;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.HConnection;

import org.apache.hadoop.hbase.client.HConnectionManager;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;

import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;

public class HbaseConnection {

private String rootDir;

private String zkServer;

private String port;

private Configuration conf;

private HConnection hConn=null;

public HbaseConnection(String rootDir, String zkServer, String port) {

super();

this.rootDir = rootDir;

this.zkServer = zkServer;

this.port = port;

conf=HBaseConfiguration.create();

conf.set("hbase.rootdir", rootDir);

conf.set("hbase.zookeeper.quorum ", zkServer);

conf.set("hbase.zookeeper.property.clientPort", port);

try {

hConn=HConnectionManager.createConnection(conf);

} catch (IOException e) {

e.printStackTrace();

}

}

//创建表

public void crateTable(String tableName,List<String> cols){

try {

HBaseAdmin admin=new HBaseAdmin(conf);

if(admin.tableExists(tableName))

throw new IOException("table exists");

else{

HTableDescriptor tableDesc=new HTableDescriptor(tableName);

for(String col:cols){

HColumnDescriptor colDesc=new HColumnDescriptor(col);

colDesc.setCompressionType(Algorithm.GZ);

colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);

tableDesc.addFamily(colDesc);

}

admin.createTable(tableDesc);

}

} catch (MasterNotRunningException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (ZooKeeperConnectionException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

//插入数据

public void saveData(String tableName,List<Put> puts){

try {

HTableInterface table =hConn.getTable(tableName);

table.put(puts);

table.setAutoFlush(false);

table.flushCommits();

} catch (IOException e) {

e.printStackTrace();

}

}

//得到数据

public Result getData(String tableName,String rowkey){

try {

HTableInterface table =hConn.getTable(tableName);

Get get=new Get(rowkey.getBytes());

return table.get(get);

} catch (IOException e) {

e.printStackTrace();

}

return null;

}

//输出result结果

public void format(Result result){

String rowkey=Bytes.toString(result.getRow());

KeyValue[] kvs=result.raw();

for (KeyValue kv:kvs){

String family= Bytes.toString(kv.getFamily());

String qualifier= Bytes.toString(kv.getQualifier());

System.out.println("rowkey->"+rowkey+"family->"+family+"qualifier->"+qualifier);

}

}

//全表扫描

public void hbaseScan(String tableName){

Scan scan=new Scan();//扫描器

scan.setCaching(1000);//缓存1000条数据,一次读取1000条

try {

HTableInterface table =hConn.getTable(tableName);

ResultScanner scanner=table.getScanner(scan);//返回迭代器

for(Result res:scanner){

format(res);

}

} catch (IOException e) {

e.printStackTrace();

}

}

//比较过滤器

public void filterTest(String tableName){

Scan scan=new Scan();//扫描器

scan.setCaching(1000);//缓存1000条数据,一次读取1000条

RowFilter filter =new RowFilter(CompareFilter.CompareOp.EQUAL,new BinaryComparator("Jack".getBytes()));

RowFilter filter1 =new RowFilter(CompareFilter.CompareOp.EQUAL,new RegexStringComparator("J\\w+"));

scan.setFilter(filter);

try {

HTableInterface table =hConn.getTable(tableName);

ResultScanner scanner=table.getScanner(scan);//返回迭代器

for(Result res:scanner){

format(res);

}

} catch (IOException e) {

e.printStackTrace();

}

}

//PageFilter分页

public void pageFilterTest(String tableName){

PageFilter filter = new PageFilter(4);

byte[] lastRow=null;

int pageCount=0; //记录第几页

try {

HTableInterface table =hConn.getTable(tableName);

while(++pageCount>0){

System.out.println("pageCount = "+ pageCount);

Scan scan=new Scan();

scan.setFilter(filter);

if(lastRow!=null){

scan.setStartRow(lastRow);

}

ResultScanner scanner=table.getScanner(scan);

int count=0;//计数器

for(Result res:scanner){

lastRow=res.getRow();

if(++count>3)

break;

format(res);

if(count<3){

break;

}

}

}

} catch (IOException e) {

e.printStackTrace();

}

}

public static void main(String[] args) {

String rootDir="hdfs://ns1/hbase";

String zkServer="10.128.129.230";//集群内网IP

String port="2181";

//

HbaseConnection conn=new HbaseConnection(rootDir, zkServer, port);

List<String> cols=new LinkedList<String>();

cols.add("basicInfo");

cols.add("moreInfo");

conn.crateTable("students", cols);

//

List<Put> puts=new LinkedList<Put>();

Put put1=new Put("Tom".getBytes());//rowkey

put1.add("basicInfo".getBytes(), "age".getBytes(), "27".getBytes());

put1.add("moreInfo".getBytes(), "tel".getBytes(), "110".getBytes());

Put put2=new Put("Jim".getBytes());

put2.add("basicInfo".getBytes(), "age".getBytes(), "28".getBytes());

put2.add("moreInfo".getBytes(), "tel".getBytes(), "111".getBytes());

puts.add(put1);

puts.add(put2);

conn.saveData("students", puts);

//

Result result= conn.getData("students", "Tom");

conn.format(result);

//

conn.hbaseScan("students");

//

conn.filterTest("students");

//

conn.pageFilterTest("students");

}

}

常用接口

package test;

import hbase.HbaseUtils;

import java.io.IOException;

import java.util.Calendar;

import java.util.Date;

import java.util.Iterator;

import java.util.Map.Entry;

import java.util.concurrent.TimeUnit;

import net.sf.json.JSONObject;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.filter.Filter;

import org.apache.hadoop.hbase.filter.PageFilter;

import org.apache.hadoop.hbase.util.Bytes;

import org.junit.Test;

import com.xd.iis.se.common.Constants;

import com.xd.iis.se.hbase.CommHbaseUtils;

import com.xd.iis.se.hbutils.MeUtils;

import commn.CommonConstants;

public class SyncTestUtils {

//hbase表名(hbaseapi包中的Constants类中定义了表名和数字的映射关系)

private final static String wz_content="wz_content";//1

private final static String lt_content="lt_content";//4

private final static String wb_content="wb_content";//2

private static final String wb_comment="wb_comment";//45

private static final String sinawb_user="sinawb_user";// 微博用户表

/* TitleHs的定义在hbaseapi包中SwitchBeanAndJsonUtils类中jsonToDocument方法里

*

* 从326行代码开始

*

* hbase表字段定义hbaseapi包中HIContentCommon类

*

* pfsearch包中IContentCommon类

* */

@Test

public void hbaseTableNameToDigitalMapping() {

for(Entry<String, String> entry: Constants.rstypemp.entrySet()){

System.out.println(entry.getKey()+":"+entry.getValue());

}

}

@Test

public void seconds(){

System.out.println(new Date().getTime());

System.out.println(System.nanoTime());

System.out.println(System.currentTimeMillis());

//时间转换

System.out.println(TimeUnit.NANOSECONDS.convert(10L, TimeUnit.MILLISECONDS));

}

//毫秒转换成日期

@Test

public void millsToDate(){

String mills="1460459403324";

Date date=new Date(Long.parseLong(mills));

System.out.println(date);

System.out.println(date.getTime());

}

//手工干预生成19位的全网微博评论tokenKey(键,rowkey)=wbcomment_key

//TokenTable=hotmanwb_token

/*

* hbase(main):003:0> scan 'hotmanwb_token',LIMIT=>2

ROW COLUMN+CELL

hotmanwb_key column=content:date, timestamp=1459310036375, value=1459310031972331086

hotmanwbcomment_key column=content:date, timestamp=1460600117890, value=1460600079542140091

ltcomment_key column=content:date, timestamp=1460600483717, value=1460600441668719114

wzcomment_key column=content:date, timestamp=1460599817280, value=1460599777817713930

*/

@Test

public void generateTokenKeyForWeiboComment(){

Calendar calendar = Calendar.getInstance();

calendar.setTime(new Date());

//三十天前的时间

calendar.add(calendar.DATE, -30);

Date date = calendar.getTime();

//第一位表示星期

System.out.println(date);

//13位加6位拼成19位

String startTime=date.getTime()+"000000";

System.out.println("startTime:"+startTime);

//插入或者更新时间

HbaseUtils.insertData("hotmanwb_token", "wbcomment_key", startTime);

//1458109165143000000 19位

}

//hbase根据表名和rowkey查询一条数据(tokenkey)

@Test

public void findByRowKey(){

String startTime=HbaseUtils.QueryByCondition1("hotmanwb_token", "wbcomment_key");

System.out.println(startTime);

}

//hbase返回前几条数据

/*key:014604646505869913352145

key:014604646505954866550445

key:014604988869079841915645

key:014605014502935460283945

key:014605014503712711041745*/

@Test

public void scanTopRowComment(){

ResultScanner resultScanner = null;

HTableInterface table = HbaseUtils.pool.getTable(wz_content);

try {

Scan scan = new Scan();

//设置过滤器,只返回20条

Filter filter = new PageFilter(5);

scan.setFilter(filter);

//RegionServer是否应当缓存 当前客户端访问过的数据块 如果是随机的get 这个最好为false

scan.setCacheBlocks(true);

/*简而言之就是 batch 是qualifier column级别的 caching是row级别的

batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,

太大 会对客户端造成比较大的压力, 具体根据需要使用 , 正常使用可以不必管

它, 大批量读取可以考虑用它改善性能

这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,

setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier

(默认一个row的所有qualifier会在一个Result中)*/

/*scan.setBatch(100);*/ //setFilter与setBatch不能都打开,会冲突

//setCaching发给scanners的缓存的Row的数量

scan.setCaching(100);

scan.setMaxVersions(1);

resultScanner = table.getScanner(scan);

/* for (Result r : rs) {

return new String(r.getRow());

}*/

Iterator<Result> res = resultScanner.iterator();// 返回查询遍历器

while (res.hasNext()) {

Result result = res.next();

System.out.println(result);

System.out.println("key:" + new String(result.getRow()));

//date列存的是json字符串

String value = new String(result.getValue(

CommonConstants.CRAWLERCONTENT_TABLE_COLUMNS

.getBytes(),

CommonConstants.CRAWLERCONTENT_TABLE_COLUMN2

.getBytes()), "ISO8859-1");

System.out.println("value:" + value);

JSONObject js = JSONObject.fromObject(value);

System.out.println(js);

}

} catch (Exception e) {

e.printStackTrace();

}finally{

//这样一定要记住 用完close

if(resultScanner!=null) resultScanner.close();

}

}

//根据rowkey范围扫描

@Test

public void scanByRowKeyRangeComment(){

ResultScanner resultScanner = null;

HTableInterface table = HbaseUtils.pool.getTable(wb_comment);

String startRow="01420459403324297147";//

String stopRow="014605014503712711";//20位

try {

Scan scan = new Scan();

//设置过滤器,只返回20条

Filter filter = new PageFilter(5);

scan.setFilter(filter);

scan.setStartRow(startRow.getBytes());

scan.setStopRow(stopRow.getBytes());

//RegionServer是否应当缓存 当前客户端访问过的数据块 如果是随机的get 这个最好为false

scan.setCacheBlocks(true);

/*简而言之就是 batch 是qualifier column级别的 caching是row级别的

batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,

太大 会对客户端造成比较大的压力, 具体根据需要使用 , 正常使用可以不必管

它, 大批量读取可以考虑用它改善性能

这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,

setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier

(默认一个row的所有qualifier会在一个Result中)*/

/*scan.setBatch(100);*/ //setFilter与setBatch不能都打开,会冲突

//setCaching发给scanners的缓存的Row的数量

scan.setCaching(100);

scan.setMaxVersions(1);

resultScanner = table.getScanner(scan);

/* for (Result r : rs) {

return new String(r.getRow());

}*/

Iterator<Result> res = resultScanner.iterator();// 返回查询遍历器

while (res.hasNext()) {

Result result = res.next();

System.out.println(result);

System.out.println("key:" + new String(result.getRow()));

//date列存的是json字符串

String value = new String(result.getValue(

CommonConstants.CRAWLERCONTENT_TABLE_COLUMNS

.getBytes(),

CommonConstants.CRAWLERCONTENT_TABLE_COLUMN2

.getBytes()), "ISO8859-1");

System.out.println("value:" + value);

JSONObject js = JSONObject.fromObject(value);

System.out.println(js);

}

} catch (Exception e) {

e.printStackTrace();

}finally{

//这样一定要记住 用完close

if(resultScanner!=null) resultScanner.close();

}

}

@Test

//hbase生成行键(hbaseApi包) 第一个url参数无用

public void createRowKey(){

//typemp.put("wb_comment", "45");// 微博评论表对应编码最后两位

String newRowKey=MeUtils.createKeyCode("", "wb_comment");

System.out.println(newRowKey);

/*String oldRowKey=MeUtils.createKeyCode_oid("", "wb_comment");

System.out.println(oldRowKey);*/

//rowkey=114606860784008157170445 24位

//1+19位时间戳+2位随机数+2位表名

}

/*Timestamp

HBase通过row和column确定一份数据,这份数据的值可能有多个版本,不同版本的值按照时间倒序排序,即最新的数据排在最前面,

查询时默认返回最新版本。如上例中row key=1的author:nickname值有两个版本,分别为1317180070811对应的“一叶渡江”和1317180718830对应的“yedu”

(对应到实际业务可以理解为在某时刻修改了nickname为yedu,但旧值仍然存在)。Timestamp默认为系统当前时间(精确到毫秒),也可以在写入数据时指定该值。

Value

每个值通过4个键唯一索引,tableName+RowKey+ColumnKey+Timestamp=>value,

例如上例中{tableName=’blog’,RowKey=’1’,ColumnName=’author:nickname’,Timestamp=’ 1317180718830’}索引到的唯一值是“yedu”。

*/

/*大Solr(192.168.20.190对应三个域名)

# 24 index

solr_24h=

# month index

solr_month=

# week index

solr_week=

*/

@Test

public void scanTopRowContent(){

ResultScanner resultScanner = null;

HTableInterface table = HbaseUtils.pool.getTable(wz_content);

try {

Scan scan = new Scan();

//设置过滤器,只返回20条

Filter filter = new PageFilter(5);

scan.setFilter(filter);

//RegionServer是否应当缓存 当前客户端访问过的数据块 如果是随机的get 这个最好为false

scan.setCacheBlocks(true);

/*简而言之就是 batch 是qualifier column级别的 caching是row级别的

batch 就是每次迭代从服务器获取的记录数, 设置太小 会频繁到服务器取数据,

太大 会对客户端造成比较大的压力, 具体根据需要使用 , 正常使用可以不必管

它, 大批量读取可以考虑用它改善性能

这里要注意了: 这个记录数是qualifier不是row, 如果一个row有17个qualifier,

setBatch(5),一个row就会分散到4个Result中, 分别持有5,5,5,2个qualifier

(默认一个row的所有qualifier会在一个Result中)*/

/*scan.setBatch(100);*/ //setFilter与setBatch不能都打开,会冲突

//setCaching发给scanners的缓存的Row的数量

scan.setCaching(100);

scan.setMaxVersions(1);

resultScanner = table.getScanner(scan);

// 返回查询遍历器

Iterator<Result> res = resultScanner.iterator();

while (res.hasNext()) {

System.out.println("--------------行分割线-------------");

Result result = res.next();

System.out.println("\n"+"------单个result--------");

System.out.println(result);

System.out.println("\n"+"------result中Cells--------");

//由{row key, Family:Qualifier, version} 唯一确定的单元。cell中的数据是没有类型的,全部是以字节的形式进行存储的

for (Cell cell : result.rawCells()) {

//rowkey

System.out.println("Rowkey : " +Bytes.toString (CellUtil.cloneRow(cell)));

//列簇+列(Family是第一级列,Qualifier是第二级列)

System.out.println("Familiy:Quilifier : " +Bytes.toString (CellUtil.cloneFamily(cell))

+":"+Bytes.toString (CellUtil.cloneQualifier (cell)));

//值

System.out.println ("Value : " +Bytes.toString (CellUtil.cloneValue (cell)));

System.out.println("TimeStamp : " +cell.getTimestamp());

}

/* //老API

System.out.println("\n"+ "------result中KeyValues--------");

for( KeyValue kv:result.list()){

System.out.println(String.format("row:%s, family:%s, qualifier:%s, qualifiervalue:%s, timestamp:%s.",

Bytes.toString(kv.getRow()),

Bytes.toString(kv.getFamily()),

Bytes.toString(kv.getQualifier()),

Bytes.toString(kv.getValue()),

kv.getTimestamp()));

} */

}

} catch (Exception e) {

e.printStackTrace();

}finally{

//这样一定要记住 用完close

if(resultScanner!=null) resultScanner.close();

}

}

//SecureCRT上传下载文件

//sz 下载命令

//rz -be 上传文件 单独用rz会有两个问题:上传中断、上传文件变化(md5不同),

/*解决办法是上传是用rz -be,并且去掉弹出的对话框中“Upload files as ASCII”前的勾选。

-a, –ascii

-b, –binary 用binary的方式上传下载,不解释字符为ascii

-e, –escape 强制escape 所有控制字符,比如Ctrl+x,DEL等

rar,gif等文件文件采用 -b 用binary的方式上传。

文件比较大而上传出错的话,采用参数 -e*/

//根据rowkey查找数据

@Test

public void select(){

String ID="114615497672016941968326";

try {

String json=CommHbaseUtils.select(ID);

System.out.println(json);

JSONObject js = JSONObject.fromObject(json);

System.out.println(js);

} catch (IOException e) {

e.printStackTrace();

}

}

/*//血和泪的经验教训

ArrayList非线程安全,即使使用Collections.synchronizedList(new ArrayList<SolrInputDocument>())

访问方法size()方法得出来的大小也是错的,还是推荐使用vector代替

//因为solrserver服务的url配置文件pfs.properties未打包进去,找不到url发生空指针异常

ScheduledExecutorService对于线程中发生的http服务方面的异常无法捕获,jstack -l命令打印信息

java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method)

- parking to wait for <0x0000000712e8e7e8> (a

解决方案:

异常可以替代使用Timer定时器来捕获

*/

/*修改properties文件编码

* 全局修改:

* window-> preference -> general -> content types

找到右边的 java properties file ,将其编码改为 utf-8

单个文件修改:

右击该properties文件--properties--Resource--Text file encoding,

选中other,选择其它编码方式,如UTF-8或GBK,这样就能在properties里面输入中文,而不会自动转成Unicode了。

* */

}

标签: #importnet