㈠ 有没有Python写的spark连接Hbase的例子
博主项目实践中,经常需要用Spark从Hbase中读取数据。其中,spark的版本为1.6,hbase的版本为0.98。现在记录一下如何在spark中操作读取hbase中的数据。
对于这种操作型的需求,没有什么比直接上代码更简单明了的了。so,show me the code!
object Demo extends Logging{
val CF_FOR_FAMILY_USER = Bytes.toBytes("U");
val CF_FOR_FAMILY_DEVICE = Bytes.toBytes("D")
val QF_FOR_MODEL = Bytes.toBytes("model")
val HBASE_CLUSTER = "hbase://xxx/"
val TABLE_NAME = "xxx";
val HBASE_TABLE = HBASE_CLUSTER + TABLE_NAME
def genData(sc:SparkContext) = {
//20161229的数据,rowkey的设计为9999-yyyyMMdd
val filter_of_1229 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("79838770"))
//得到qf为w:00-23的数据
val filter_of_qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator("w"))
val all_filters = new util.ArrayList[Filter]()
all_filters.add(filter_of_1229)
all_filters.add(filter_of_qf)
//hbase多个过滤器
val filterList = new FilterList(all_filters)
val scan = new Scan().addFamily(CF_FOR_FAMILY_USER)
scan.setFilter(filterList)
scan.setCaching(1000)
scan.setCacheBlocks(false)
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,HBASE_TABLE )
conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))
sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
//后面是针对hbase查询结果的具体业务逻辑
.map()
...
def main(args: Array[String]): Unit = {
val Array(output_path) = args
val sparkConf = new SparkConf().setAppName("demo")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(sparkConf)
genUuidWifi(sc).saveAsTextFile(output_path)
sc.stop()
}
}04142434445460414243444546
需要注意的一个小点就是如果hbase里有多个过滤器,注意需要使用FilterList。
㈡ 简述通过Apache+HBase+API进行表过滤器应用的理解
摘要 过滤器是HBase为客户端提供的一种高级API,是HBase的一种高级特性,它提供了非常强大的功能帮助用户处理表中的数据。HBase中读取数据的API主要是get()和scan(),它们都支持直接读取数据和通过指定起始行健访问数据的功能,可以通过添加限定条件如列族、列、时间戳等来限制查询的数量,但是它们缺少一种细粒度的的筛选功能,比如基于正则表达式的筛选。由此诞生过滤器,Get类和Scan类都支持过滤器,通过方法setFilter(Filter filter)可以设置查询时的过滤器。
㈢ hbase shell 中有版本过滤器吗
进入hbase shell console
$HBASE_HOME/bin/hbase shell
如果有kerberos认证,需要事先使用相应的keytab进行一下认证(使用kinit命令),认证成功之后再使用hbase shell进入可以使用whoami命令可查看当前用户!
㈣ hbase的过滤器有哪些
HBase为筛选数据提供了一组过滤器,通过这个过滤器可以在中的数据的多个维度(行,列,数据版本)上进行对数据的筛选操作,也就是说过滤器最终能够筛选的数据能够细化到具体的一个存储单元格上(由行键,列明,时间戳定位)。通常来说,通过行键,值来筛选数据的应用场景较多。
1. RowFilter:筛选出匹配的所有的行,对于这个过滤器的应用场景,是非常直观的:使用BinaryComparator可以筛选出具有某个行键的行,或者通过改变比较运算符(下面的例子中是CompareFilter.CompareOp.EQUAL)来筛选出符合某一条件的多条数据,以下就是筛选出行键为row1的一行数据:
[java]view plain
Filterrf=newRowFilter(CompareFilter.CompareOp.EQUAL,newBinaryComparator(Bytes.toBytes("row1")));//OK筛选出匹配的所有的行
[java]view plain
Filterpf=newPrefixFilter(Bytes.toBytes("row"));//OK筛选匹配行键的前缀成功的行
[java]view plain
Filterkof=newKeyOnlyFilter();//OK返回所有的行,但值全是空
[java]view plain
Filterrrf=newRandomRowFilter((float)0.8);//OK随机选出一部分的行
[java]view plain
Filterisf=newInclusiveStopFilter(Bytes.toBytes("row1"));//OK包含了扫描的上限在结果之内
[java]view plain
Filterfkof=newFirstKeyOnlyFilter();//OK筛选出第一个每个第一个单元格
[java]view plain
Filtercpf=newColumnPrefixFilter(Bytes.toBytes("qual1"));//OK筛选出前缀匹配的列
[java]view plain
Filtervf=newValueFilter(CompareFilter.CompareOp.EQUAL,newSubstringComparator("ROW2_QUAL1"));//OK筛选某个(值的条件满足的)特定的单元格
[java]view plain
Filterccf=newColumnCountGetFilter(2);//OK如果突然发现一行中的列数超过设定的最大值时,整个扫描操作会停止
[java]view plain
SingleColumnValueFilterscvf=newSingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("qual2"),
CompareFilter.CompareOp.NOT_EQUAL,
newSubstringComparator("BOGUS"));
scvf.setFilterIfMissing(false);
scvf.setLatestVersionOnly(true);//OK
12.SkipFilter:这是一种附加过滤器,其与ValueFilter结合使用,如果发现一行中的某一列不符合条件,那么整行就会被过滤掉:
[java]view plain
Filterskf=newSkipFilter(vf);//OK发现某一行中的一列需要过滤时,整个行就会被过滤掉
Filterwmf=newWhileMatchFilter(rf);//OK类似于Pythonitertools中的takewhile
List<Filter>filters=newArrayList<Filter>();
filters.add(rf);
filters.add(vf);
FilterListfl=newFilterList(FilterList.Operator.MUST_PASS_ALL,filters);//OK综合使用多个过滤器,AND和OR两种关系
㈤ hbase有几种读法
不明白你问的意思。
首先可以按照rowkey进行查找
还可以按照列族,列,列的值 等等等等
hbase 有很多过滤器,你可以看看 不过效率不是很好
㈥ 求教:怎样用hbase过滤器实现,一个列多列值
HBase为筛选数据提供了一组过滤器,通过这个过滤器可以在HBase中的数据的多回个维度(行,列,数据版本答)上进行对数据的筛选操作,也就是说过滤器最终能够筛选的数据能够细化到具体的一个存储单元格上(由行键,列明,时间戳定位)。
㈦ hbase中rowkey设置问题。
主键设计成:现有的主键+频度+列,即h+1+hi,但是最好将每个都格式化成定长的字符串,当你需要取前5个记录时使用过滤器取出前5条记录即可。大体如此,具体细节可能还需要好好设计
㈧ 关于HBase的rowkey设计我想问以下问题
首先过滤器在RegionServer里发挥作用,即在RS层过滤掉客户端不需要的数据,以减少网络传输的数据量,以此减少查询时间,所以不会减少查询的数据量。根据你的需求,个人觉得rowkey应该设计成用户名+学校+学院+姓名,并且将这几个值全部转换成定长的字符串存储,以便查询。
㈨ 如何使用python在hbase里进行模糊查询
#导入thrift和habse包
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import *
#此处可以修改地址和端口
host = '192.168.1.1'
#默认端口为9090
port = 9090
#要查询的表名
table = 'table_name'
#定义一个过滤器,此为关键步骤
filter = "RowFilter(=,'regexstring:.3333.')" #此行原创:)
# Make socket
transport = TSocket.TSocket(host, port)
# Buffering is critical. Raw sockets are very slow
# 还可以用TFramedTransport,也是高效传输方式
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
#传输协议和传输过程是分离的,可以支持多协议
protocol = TBinaryProtocol.TBinaryProtocol(transport)
#客户端代表一个用户
client = Hbase.Client(protocol)
#打开连接
try:
transport.open()
scan.filterString=filter
scanner = client.scannerOpenWithScan(table, scan)
except Exception:
finally:
client.scannerClose(scan)
transport.close()
连接代码网上一搜一大堆,非原创,来源已不可考,非本人研究成果;
关键就是这个:"RowFilter(=,'regexstring:.3333.')"
这个过滤器要写对,hbase有十几种内置的过滤器方法,有几种比较运算符和比较器,上面这个是正则方式,即'regexstring:.3333.';
过滤器整个双引号里面的内容会通过thrift传给hbase服务端处理,下划线这部分正则要支持java的正则要求不然会报错
㈩ hbase模糊查询
哈哈哈,恰好我也在做一个类似的问题;hbase权威指南133页,关于rowkey有一个内建的过滤器:
Scan scan = new Scan();
Filter filter = new RowFilter(CompareOp.EQUAL,new RegexStringComparator(".*京Q00"));
scan.setFilter(filter);
ResultScanner scanner;
try {
scanner = table.getScanner(scan);
for(Result res:scanner)
{
System.out.println(res);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
上面内这段代码应该容能够解决你的问题啦,enjoy it.