ビッグデータ取り扱いの技術② ~解析編~

Cloud 1 (5)


今回は収集したデータをどう解析していくか書いていきます。
解析にはHadoopを利用します。

Hadoopの動作環境はAmazon EMR (Elastic MapReduce)を使います。

【前回の流れ】

  1. fluentdのインストール
  2. td-loggerでアプリケーションログの吐き出し
  3. td-loggerで受け取ったログをS3に送信
  4. S3のデータをHadoopで解析
  5. Hadoopで解析したデータをRDSに突っ込む

※前回の記事はこちら
株式会社ヒトクセ公式ブログ「ビッグデータ取扱いの技術① ~収集編」

今回は前回の続きということで、S3のデータをHadoopで解析から解説します。
Hadoopの処理はSQLライクに書けるHiveを利用します。

S3のデータをHadoopで解析

先程も書きましたが、EMRを利用します。
基本的にはコマンドラインのみでやっていきます。
なのでコマンドラインツールを予めインストールしといてください。

参考図書はこちら
Amazon.co.jp 佐々木達也「Hadoopファーストガイド」

EMRの起動

まずはEMRの起動から
認証系の設定ファイルを書きます
.credentials.jsonをホームディレクトリに追加。内容は以下です

{
  "access-id":     'AWSのkey',
  "private-key":   'AWSのsecret-key',
  "key-pair":      '公開鍵のファイル',
  "key-pair-file": '公開鍵のファイルのパス',
  'region': 'リージョン名[ap-northeast-1(東京)]',
  "log-uri":       ''
}

こちらが参考になります。
gihyo.jp『Amazon Elastic MapReduceの使い方─Hadoopより手軽にはじめる大規模計算』
第3回 「Amazon Elastic MapReduce Ruby ClientでEMRを起動する」


起動コマンドはこちら

elastic-mapreduce --create --alive --name "クラスター名" --key-pair "キーの名前" \
--instance-group master --instance-type m1.small --instance-count 1 \
--bid-price 0.02 \
--instance-group core --instance-type m1.small --instance-count 2 \
--bid-price 0.02 \
--hive-interactive

これで起動が完了です。

解説すると

--create で起動
--alive でシャットダウンを手動で行う設定
--name Hadoopクラスター名を指定
--key-pair インスタンスの鍵を指定
--instance-group master 詳しくは解説しませんが、hadoopにはmaster, core, taskのインスタンスグループがあります。masterとcoreは必ず必要です。
--instance-type インスタンスのサイズなどを指定します
--instance-count そのグループのインスタンス数を指定。masterは1つ。
--bid-price スポットインスタンスの入札価格 
--hive-interactive hiveを使うためのオプション。これがないとhiveが使えません。

スポットインスタンスに関して詳しくはこちら
amazon web services「Amazon EC2 スポットインスタンス」

確認は

elastic-mapreduce --list --active

としたとき

j-[jobflow_id]     WAITING        [public DNS][クラスター名]
   COMPLETED      Setup Hive   

このように
WAITING

COMPLETED
と表示されていれば利用できます。

hiveの利用

sshでmasterノードに入り作業をします

ssh -i [公開鍵のパス] hadoop@[public DNS]

ユーザーはhadoopユーザーを利用します。

sshでログイン後

hive

と入力します。するとhiveのコンソールが表示されます。


まずはS3からデータを取り込みます。
(厳密には取り込むっていい方は違うっぽい(もうちょい詳しくなりたい))
イメージ的にはテーブルを作ってそこにS3のデータを取り込む
この辺はこちらを参考にしました。
Developers.IO「はじめてのEMR/fluentdでS3にアップロードしたログをElastic MapReduceで検索・集計する」

CREATE EXTERNAL TABLE IF NOT EXISTS [テーブル名] (dt string, tag string, json string)
PARTITIONED BY ( PT STRING ) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'

このようなテーブルを作成します。

desc [テーブル名]

とすると

dt string
tag string
json string
pt string

このようなcolumn情報が表示されます。


前回の記事からログは

2014-03-29T00:40:11+09:00   my_app.test {"hoge":"piyo"}

このような感じになります。
なのでイメージ的には

dt  2014-03-29T00:40:11+09:00
tag my_app.test
json {"hoge" : "piyo"}

というデータが入っていきます。

ただこの段階ではソースは指定されていません。
ソースの指定はパーティションの追加により行います。
パーティションの追加は

ALTER TABLE [テーブル名] ADD IF NOT EXISTS PARTITION (pt = '[日付情報など]') LOCATION 's3://[S3のバケット名]'

このように行います。

ここまででhiveでS3に保存したデータを取り扱うことができます。


実際にSELECT文で色々試してみましょう

select * from [テーブル名];

とすると仮にS3の内容が

2014-03-29T00:40:11+09:00   my_app.test {"hoge":"piyo"}
2014-03-29T00:40:11+09:00   my_app.test {"hoge":"fuga"}

だとすると
そのまま

2014-03-29T00:40:11+09:00   my_app.test {"hoge":"piyo"}
2014-03-29T00:40:12+09:00   my_app.test {"hoge":"fuga"}

と出てきます。
jsonが少々扱いにくいのでこのように書くとjsonの中身を利用できます。

select hoge
from [テーブル名] LATERAL VIEW json_tuple(テーブル名.json, hoge) j AS hoge

とすると

piyo
fuga

となります。

さらに条件もjsonの中身のやつを利用できます

select hoge
from [テーブル名] LATERAL VIEW json_tuple(テーブル名.json, hoge) j AS hoge
where hoge = "piyo"

とすると

piyo

のみが取り出せます。

前回のと組み合わせるとアプリケーションログを取得し、
かつhiveにより利用出来るようになりました。



今回はボリュームが多くなってきたので、この辺で。
次回はこれらの処理をrubyから動作させる方法について書いていきたいと思います。

参考図書

佐々木達也「Hadoopファーストガイド」は本当に参考になりました。
まず最初に読むべき本かと思います。

ヒトクセの各種サービスお問い合わせは
こちらからお気軽にどうぞ。
株式会社ヒトクセ お問い合わせフォーム