php 操作Hbase

首页

phper对大数据生态方面算是比较陌生,这个主要是因为php这几年几乎是脱离了大数据的生态在发展(其实是萎缩)。大多hadoop生态圈的产品一般只提供java、C++ Api或者是python操作库,所以当我们想使用hadoop生态中的产品时都不知道如何入手。好在有他们还支持Thrift,让phper有进入hadoop生态圈的入口。话不多说,再次我就以Hbase为例,一步一步的介绍php怎么来和Hbase交互进行数据操作。

首先是要解决Hbase Thrift的工具包的问题,其实网上有很多教程,看他们的操作库的引入方式就知道大多数都是介绍比较老的Hbase版本。放在高一些的版本不一定适用。我们以Hbase 1.2为例进行介绍。

一、安装Thrift

有win和*nix版本,不过在*nix系统需要编译安装。下面是thrift网站http://thrift.apache.org/

二、下载Hbase 1.2 版本的源码包,Hbase安装部署我们就不在这里介绍。

进入到Hbase目录,定位到 hbase-1.2.0\hbase-thrift\src\main\resources\org\apache\hadoop\hbase\thrift下;现在我们就可以使用我们安装的 thrift 进行php habse api的生生成了。比如 我把 thrift安装在 d:\software\thrift\,那我们就执行 d:\software\thrift\ thrift.exe –gen php ./Hbase.thrift 。如果执行没问题后就会在当前目录下生成一个 “gen-php的目录”,点进去看下,会包括如下文件:

三、使用composer自动加载组合 Hbase thirft 和 Thrift php lib

首先我们随便在哪里新建一个目录,就取名为 “ rhbase ”。如果是下载的Thrift源码编译安装的(否则去官网下载对于版本的源码),直接进入到“\thrift-0.12.0\lib\php\lib” 下把所有文件拷贝到 rhbase/src/Thrift 目录下。另外到 “rhabse/ src ”下再新建一个目录,目录名称为“Hbase”,把刚才通过thirft.exe 生成的代码文件全部拷贝到Hbase目录下。

确保我们的系统安装了 composer,如果没有安装则现在安装compoer。之后进入到 “rhbase” 目录执行 composer init ,一路 enter 就可以了,顺利的话就会在目录下生成一个composer.json文件。我们打开json文件进行编辑:

   {
    "name": "xxx/rhbase", // 这里是包名
    "type": "library",
    "authors": [
        {
            "name": "56itec",
            "email": "xxxx@56itec.com"
        }
    ],
    "require": {},
    "autoload": {
        "psr-4": {
            "Hbase\\": "src/Hbase", // 注意这里
            "Thrift\\": "src/Thrift" // 和 这里
        }
    }
}

编辑好了,就在当前目录执行 composer install ,这个过程可能需要点时间。成功之后会在当前目录下生成一个 vendor 文件夹,如果对composer熟悉的接下来肯定就知道怎么做了。

最后我们下一个操作Hbase的例子:

<?php
include __DIR__ . '/rhbase/vendor/autoload.php';

use Hbase\ColumnDescriptor;
use Hbase\AlreadyExists;
use Hbase\Mutation;

$host = '192.168.6.183';
$port = 9090;
$socket = new \Thrift\Transport\TSocket($host, $port);
$socket->setRecvTimeout(5000);
$socket->setSendTimeout(5000);
$transport = new \Thrift\Transport\TBufferedTransport($socket);
$protocol = new \Thrift\Protocol\TBinaryProtocol($transport);
$client = new \Hbase\HbaseClient($protocol);
$transport->open();


$tables = $client->getTableNames();
sort($tables);
foreach ($tables as $name) {
    echo nl2br("  found: {$name}\n");
}
$columns = array(new ColumnDescriptor(array('name' => 'entry:', 'maxVersions' => 10)), new ColumnDescriptor(array('name' => 'unused:')));
$t = "table1";
echo "creating table: {$t}\n";
try {
    $client->createTable($t, $columns);
} catch (AlreadyExists $ae) {
    echo "WARN: {$ae->message}\n";
}
echo "column families in {$t}:\n";
$descriptors = $client->getColumnDescriptors($t);
asort($descriptors);
foreach ($descriptors as $col) {
    echo "  column: {$col->name}, maxVer: {$col->maxVersions}\n";
}

$row = "unused";
$valid = "foobar-生ビ" . microtime(true);
$mutations = array(new Mutation(array('column' => 'entry:foo2', 'value' => $valid)));
// 多记录批量提交(200提交一次时测试小记录大概在5000/s左右):  $rows = array('timestamp'=>$timestamp, 'columns'=>array('txt:col1'=>$col1, 'txt:col2'=>$col2, 'txt:col3'=>$col3));  $records = array(rowkey=>$rows,...);  $batchrecord = array();  foreach ($records as $rowkey => $rows) {      $timestamp = $rows['timestamp'];      $columns = $rows['columns'];      // 生成一条记录      $record = array();      foreach($columns as $column => $value) {          $col = new Mutation(array('column'=>$column, 'value'=>$value));          array_push($record, $col);      }      // 加入记录数组      $batchTmp = new BatchMutation(array('row'=>$rowkey, 'mutations'=>$record));      array_push($batchrecord, $batchTmp);  }  $ret = $hbase->mutateRows('test', $batchrecord);
$client->mutateRow($t, $row, $mutations, []);

print_r($client->get($t,$row,'entry:foo2',[]));
print_r($client->getRow($t, $row, []));

$transport->close();

搞定,是不是很简单

发表评论

电子邮件地址不会被公开。 必填项已用*标注