㈠ 有沒有Python寫的spark連接Hbase的例子
博主項目實踐中,經常需要用Spark從Hbase中讀取數據。其中,spark的版本為1.6,hbase的版本為0.98。現在記錄一下如何在spark中操作讀取hbase中的數據。
對於這種操作型的需求,沒有什麼比直接上代碼更簡單明了的了。so,show me the code!
object Demo extends Logging{
val CF_FOR_FAMILY_USER = Bytes.toBytes("U");
val CF_FOR_FAMILY_DEVICE = Bytes.toBytes("D")
val QF_FOR_MODEL = Bytes.toBytes("model")
val HBASE_CLUSTER = "hbase://xxx/"
val TABLE_NAME = "xxx";
val HBASE_TABLE = HBASE_CLUSTER + TABLE_NAME
def genData(sc:SparkContext) = {
//20161229的數據,rowkey的設計為9999-yyyyMMdd
val filter_of_1229 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("79838770"))
//得到qf為w:00-23的數據
val filter_of_qf = new QualifierFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator("w"))
val all_filters = new util.ArrayList[Filter]()
all_filters.add(filter_of_1229)
all_filters.add(filter_of_qf)
//hbase多個過濾器
val filterList = new FilterList(all_filters)
val scan = new Scan().addFamily(CF_FOR_FAMILY_USER)
scan.setFilter(filterList)
scan.setCaching(1000)
scan.setCacheBlocks(false)
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,HBASE_TABLE )
conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()))
sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
//後面是針對hbase查詢結果的具體業務邏輯
.map()
...
def main(args: Array[String]): Unit = {
val Array(output_path) = args
val sparkConf = new SparkConf().setAppName("demo")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(sparkConf)
genUuidWifi(sc).saveAsTextFile(output_path)
sc.stop()
}
}04142434445460414243444546
需要注意的一個小點就是如果hbase里有多個過濾器,注意需要使用FilterList。
㈡ 簡述通過Apache+HBase+API進行表過濾器應用的理解
摘要 過濾器是HBase為客戶端提供的一種高級API,是HBase的一種高級特性,它提供了非常強大的功能幫助用戶處理表中的數據。HBase中讀取數據的API主要是get()和scan(),它們都支持直接讀取數據和通過指定起始行健訪問數據的功能,可以通過添加限定條件如列族、列、時間戳等來限制查詢的數量,但是它們缺少一種細粒度的的篩選功能,比如基於正則表達式的篩選。由此誕生過濾器,Get類和Scan類都支持過濾器,通過方法setFilter(Filter filter)可以設置查詢時的過濾器。
㈢ hbase shell 中有版本過濾器嗎
進入hbase shell console
$HBASE_HOME/bin/hbase shell
如果有kerberos認證,需要事先使用相應的keytab進行一下認證(使用kinit命令),認證成功之後再使用hbase shell進入可以使用whoami命令可查看當前用戶!
㈣ hbase的過濾器有哪些
HBase為篩選數據提供了一組過濾器,通過這個過濾器可以在中的數據的多個維度(行,列,數據版本)上進行對數據的篩選操作,也就是說過濾器最終能夠篩選的數據能夠細化到具體的一個存儲單元格上(由行鍵,列明,時間戳定位)。通常來說,通過行鍵,值來篩選數據的應用場景較多。
1. RowFilter:篩選出匹配的所有的行,對於這個過濾器的應用場景,是非常直觀的:使用BinaryComparator可以篩選出具有某個行鍵的行,或者通過改變比較運算符(下面的例子中是CompareFilter.CompareOp.EQUAL)來篩選出符合某一條件的多條數據,以下就是篩選出行鍵為row1的一行數據:
[java]view plain
Filterrf=newRowFilter(CompareFilter.CompareOp.EQUAL,newBinaryComparator(Bytes.toBytes("row1")));//OK篩選出匹配的所有的行
[java]view plain
Filterpf=newPrefixFilter(Bytes.toBytes("row"));//OK篩選匹配行鍵的前綴成功的行
[java]view plain
Filterkof=newKeyOnlyFilter();//OK返回所有的行,但值全是空
[java]view plain
Filterrrf=newRandomRowFilter((float)0.8);//OK隨機選出一部分的行
[java]view plain
Filterisf=newInclusiveStopFilter(Bytes.toBytes("row1"));//OK包含了掃描的上限在結果之內
[java]view plain
Filterfkof=newFirstKeyOnlyFilter();//OK篩選出第一個每個第一個單元格
[java]view plain
Filtercpf=newColumnPrefixFilter(Bytes.toBytes("qual1"));//OK篩選出前綴匹配的列
[java]view plain
Filtervf=newValueFilter(CompareFilter.CompareOp.EQUAL,newSubstringComparator("ROW2_QUAL1"));//OK篩選某個(值的條件滿足的)特定的單元格
[java]view plain
Filterccf=newColumnCountGetFilter(2);//OK如果突然發現一行中的列數超過設定的最大值時,整個掃描操作會停止
[java]view plain
SingleColumnValueFilterscvf=newSingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("qual2"),
CompareFilter.CompareOp.NOT_EQUAL,
newSubstringComparator("BOGUS"));
scvf.setFilterIfMissing(false);
scvf.setLatestVersionOnly(true);//OK
12.SkipFilter:這是一種附加過濾器,其與ValueFilter結合使用,如果發現一行中的某一列不符合條件,那麼整行就會被過濾掉:
[java]view plain
Filterskf=newSkipFilter(vf);//OK發現某一行中的一列需要過濾時,整個行就會被過濾掉
Filterwmf=newWhileMatchFilter(rf);//OK類似於Pythonitertools中的takewhile
List<Filter>filters=newArrayList<Filter>();
filters.add(rf);
filters.add(vf);
FilterListfl=newFilterList(FilterList.Operator.MUST_PASS_ALL,filters);//OK綜合使用多個過濾器,AND和OR兩種關系
㈤ hbase有幾種讀法
不明白你問的意思。
首先可以按照rowkey進行查找
還可以按照列族,列,列的值 等等等等
hbase 有很多過濾器,你可以看看 不過效率不是很好
㈥ 求教:怎樣用hbase過濾器實現,一個列多列值
HBase為篩選數據提供了一組過濾器,通過這個過濾器可以在HBase中的數據的多回個維度(行,列,數據版本答)上進行對數據的篩選操作,也就是說過濾器最終能夠篩選的數據能夠細化到具體的一個存儲單元格上(由行鍵,列明,時間戳定位)。
㈦ hbase中rowkey設置問題。
主鍵設計成:現有的主鍵+頻度+列,即h+1+hi,但是最好將每個都格式化成定長的字元串,當你需要取前5個記錄時使用過濾器取出前5條記錄即可。大體如此,具體細節可能還需要好好設計
㈧ 關於HBase的rowkey設計我想問以下問題
首先過濾器在RegionServer里發揮作用,即在RS層過濾掉客戶端不需要的數據,以減少網路傳輸的數據量,以此減少查詢時間,所以不會減少查詢的數據量。根據你的需求,個人覺得rowkey應該設計成用戶名+學校+學院+姓名,並且將這幾個值全部轉換成定長的字元串存儲,以便查詢。
㈨ 如何使用python在hbase里進行模糊查詢
#導入thrift和habse包
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
from hbase.ttypes import *
#此處可以修改地址和埠
host = '192.168.1.1'
#默認埠為9090
port = 9090
#要查詢的表名
table = 'table_name'
#定義一個過濾器,此為關鍵步驟
filter = "RowFilter(=,'regexstring:.3333.')" #此行原創:)
# Make socket
transport = TSocket.TSocket(host, port)
# Buffering is critical. Raw sockets are very slow
# 還可以用TFramedTransport,也是高效傳輸方式
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
#傳輸協議和傳輸過程是分離的,可以支持多協議
protocol = TBinaryProtocol.TBinaryProtocol(transport)
#客戶端代表一個用戶
client = Hbase.Client(protocol)
#打開連接
try:
transport.open()
scan.filterString=filter
scanner = client.scannerOpenWithScan(table, scan)
except Exception:
finally:
client.scannerClose(scan)
transport.close()
連接代碼網上一搜一大堆,非原創,來源已不可考,非本人研究成果;
關鍵就是這個:"RowFilter(=,'regexstring:.3333.')"
這個過濾器要寫對,hbase有十幾種內置的過濾器方法,有幾種比較運算符和比較器,上面這個是正則方式,即'regexstring:.3333.';
過濾器整個雙引號裡面的內容會通過thrift傳給hbase服務端處理,下劃線這部分正則要支持java的正則要求不然會報錯
㈩ hbase模糊查詢
哈哈哈,恰好我也在做一個類似的問題;hbase權威指南133頁,關於rowkey有一個內建的過濾器:
Scan scan = new Scan();
Filter filter = new RowFilter(CompareOp.EQUAL,new RegexStringComparator(".*京Q00"));
scan.setFilter(filter);
ResultScanner scanner;
try {
scanner = table.getScanner(scan);
for(Result res:scanner)
{
System.out.println(res);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
上面內這段代碼應該容能夠解決你的問題啦,enjoy it.