Hbase Python API

  • 启动hadoop、hbase
cd /opt/hadoop/sbin/
hadoop namenode -format
./start-all.sh
cd /opt/hbase-1.2.6/bin/
./start-hbase.sh
jps

quicker_7658c6cb-5fcd-41cf-9ebe-f5a876e35ec8.png

hbase-daemon.sh start thrift
hbase shell

quicker_be5eb6c1-c985-4167-b2c6-59cc3e5e9f84.png


  • PyCharm终端安装依赖

Python环境选择2.7版本!!!

Python环境选择2.7版本!!!

Python环境选择2.7版本!!!

quicker_115a773c-f7e4-48e0-b659-ea78bb1251a3.png

python -m pip install -i http://pypi.douban.com/simple --trusted-host pypi.douban.com --upgrade pip

quicker_4777d1dc-fa7c-4b20-92f9-e79ceb4e36ce.png

pip install thrift -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

quicker_09fe22f6-9548-472e-9861-019c437e0c79.png

pip install hbase-thrift -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

quicker_a765389d-7e44-4e26-94eb-ec9b01367c83.png

额外安装一个numpy依赖,用于将查询结果进行格式化处理:

pip install numpy -i http://pypi.douban.com/simple --trusted-host pypi.douban.com

补充说明:查询结果并非常见的Python dict字典类型的数据,而是generator类型,所以无法直接打印。可以先将其转化为list类型,再通过np.array()将原本的一行数据进行格式化、改为多行显示。

quicker_5094669e-4b3c-43cf-b726-e1cad233825b.png

  • Python代码
# coding=utf-8
from thrift.transport import TSocket
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol

from hbase import Hbase
from hbase.ttypes import ColumnDescriptor
from hbase.ttypes import Mutation
import numpy as np


class HBaseClient(object):
    def __init__(self, ip, port=9090):
        # server端地址和端口设定
        self.__transport = TBufferedTransport(TSocket.TSocket(ip, port))
        # 设置传输协议
        protocol = TBinaryProtocol.TBinaryProtocol(self.__transport)
        # 客户端
        self.__client = Hbase.Client(protocol)
        # 打开连接
        self.__transport.open()

    def __del__(self):
        self.__transport.close()

    def get_tables(self):
        """
        获得所有表
        :return:表名列表
        """
        return self.__client.getTableNames()

    def create_table(self, table, *columns):
        """
        创建表格
        :param table:表名
        :param columns:列族名
        """
        func = lambda col: ColumnDescriptor(col)
        column_families = map(func, columns)
        self.__client.createTable(table, column_families)
        print "创建成功"

    def put(self, table, row, columns):
        """
        添加记录
        :param table:表名
        :param row:行键
        :param columns:列名
        :return:
        """
        func = lambda (k, v): Mutation(column=k, value=v)
        mutations = map(func, columns.items())
        self.__client.mutateRow(table, row, mutations)
        print "插入成功"

    def delete(self, table, row, column):
        """
        删除记录
        :param table:表名
        :param row:行键
        """
        self.__client.deleteAll(table, row, column)
        print '删除成功'

    def scan(self, table, start_row="", columns=None):
        """
        获得记录
        :param table: 表名
        :param start_row: 起始行
        :param columns: 列族
        """
        scanner = self.__client.scannerOpen(table, start_row, columns)
        func = lambda (k, v): (k, v.value)
        while True:
            r = self.__client.scannerGet(scanner)
            if not r:
                break
            yield dict(map(func, r[0].columns.items()))
        print "查询成功"


def create_tool(basename):
    client.create_table(basename, 'pid', 'pinfo', 'padd')


def put_tool(basename, mystuid):
    client.put(basename, "0090",
               {"pinfo:name:": "Jane",
                "pinfo:high": "170",
                "pinfo:weigh": "54",
                "padd:city": "benxi",
                "padd:room": "A401"
                })
    client.put(basename, "0091",
               {"pinfo:name:": "Mike",
                "pinfo:high": "180",
                "pinfo:weigh": "84",
                "padd:city": "shenyang",
                "padd:room": "A402"
                })
    client.put(basename, "0092",
               {"pinfo:name:": "Peter",
                "pinfo:high": "185",
                "pinfo:weigh": "70",
                "padd:city": "dandong",
                "padd:room": "B403"
                })
    client.put(basename, mystuid,
               {"pinfo:name:": "July",
                "pinfo:high": "180",
                "pinfo:weigh": "160",
                "padd:city": "liaoning",
                "padd:room": "A615"
                })


def update_tool(basename, username, mystuid):
    client.put(basename, mystuid,
               {"pinfo:name:": username,
                "pinfo:high": "175",
                "pinfo:weigh": "110",
                "padd:city": "hebee",
                "padd:room": "A615"
                })
    client.put(basename, "0092",
               {
                   "pinfo:weigh": "65",
               })


def delete_tool(basename):
    client.delete(basename, '0091', 'pinfo')
    client.delete(basename, '0091', 'padd')


def scan_tool(basename):
    scan_dict = client.scan(basename)
    print np.array(list(scan_dict))


if __name__ == '__main__':
    client = HBaseClient("127.0.0.1")
    user_name = "liyizhuang"
    base_name = "stu18309"
    my_stuid = "0009"

    # create_tool(base_name)
    # put_tool(base_name, my_stuid)
    # scan_tool(base_name)

    # update_tool(base_name, user_name, my_stuid)
    # scan_tool(base_name)

    # delete_tool(base_name)
    # scan_tool(base_name)

增、查:

quicker_c51829d0-10c7-42bd-8611-b9fa26db5fa6.png

改、查:

quicker_3022cf2a-2052-462f-9e53-ae3601797f99.png

删、查:

quicker_c17575ce-9b16-40a1-b2dc-6b335b34fc01.png


HBASE指令参考
  • 查询全表数据

例:scan "stu18309"

scan "表名"
  • 查询指定行
scan '表名', {STARTROW =>'001',ENDROW => '003'}
  • 删除整行数据
deleteall '表名', '0001'
  • 清空表
truncate '表名'
  • 删除表
disable '表名'
drop '表名'
  • 显示行键前缀为0开头的键值对
scan '表名',{FILTER=>"RowFilter(=,'substring:0')"}
  • 显示行键字节顺序大于002的键值对
scan '表名',{FILTER=>"RowFilter(>,'binary:002' )"}
  • 显示行键前缀为0开头的键值对
scan '表名',FILTER=>"PrefixFilter('003')"
  • KeyOnlyFilter:只对cell的键进行过滤和显示,不显示值。
scan '表名',FILTER=>"KeyOnlyFilter()"
  • 统计表的逻辑行数
count '表名',FILTER=>"FirstKeyOnlyFilter()"
scan '表名',FILTER=>"FirstKeyOnlyFilter()"
  • 显示起始行键为001,结束行为003的记录
scan '表名',{STARTROW=>'001',FILTER=> "InclusiveStopFilter('003')"}
  • 显示列族前缀为stu开头的键值对
scan '表名',FILTER=>"FamilyFilter(=,'substring:stu')"
scan '表名',FILTER=>"FamilyFilter(=,'binary:stu')"
  • 显示列名为name的记录
scan '表名',FILTER=>"QualifierFilter(=,'substring:name')"
scan '表名',FILTER=>"ColumnPrefixFilter('name')"
  • 显示列名为name和age的记录
scan '表名',FILTER=>"MultipleColumnPrefixFilter('name','age')"
  • 查询列名在bi和na之间的记录
scan '表名',FILTER=>"ColumnRangeFilter('bi',true,'na',true)"
  • 时间戳过滤器
scan '表名',FILTER=>"TimestampsFilter(2,4)"
  • 查询值等于19的所有键值对
scan '表名',FILTER=>"ValueFilter(=,'binary:19')"
scan '表名',FILTER=>"ValueFilter(=,'substring:19')"
  • 查询personal列族age列中值等于19的所有键值对
scan '表名',{COLUMN=>'personal:age',FILTER=>"SingleColumnValueFilter('personal','age', =,'binary:19')"}
  • 返回行键为001的前3个键值对
get '表名', '001',FILTER=>"ColumnCountGetFilter(3)"
  • 基于行的分页过滤器,设置返回行数
scan '表名',FILTER=>"PageFilter(1)"
  • 显示每行第1列之后的2个键值对
scan '表名',FILTER=>"ColumnPaginationFilter(2,1)"
  • 组合过滤器的使用
scan '表名',FILTER=>"ColumnPaginationFilter(2,1) AND ValueFilter(=,'substring:80')"