Ruby版ユーザライブラリ¶
インストール¶
- バージョン2.1.0以降のRubyが導入されている必要があります。
- Gemパッケージは、Rubygems.orgにて公開されています。
$ gem install iij-dag-client
また以下のURLからGemファイルをダウンロードし、Gemファイル名を指定してインストールすることも可能です。
$ gem install iij-dag-client-1.0.1.gem
使い方¶
clientの生成¶
アクセスキーIDおよびシークレットアクセスキーを設定し、clientを生成します。
Dag::Client.new(<アクセスキーID>, <シークレットアクセスキー>, <オプション>)
パラメータについて¶
アクセスキーIDおよびシークレットアクセスキーに加え、以下の表に示すオプションを設定することが出来ます。
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 |
---|---|---|---|---|
apikey | yes | String | アクセスキーID | |
secret | yes | String | シークレットアクセスキー | |
analysis_api | no | String | https://analysis-dag.iijgio.com | アナリシスAPIのエンドポイント |
storage_api | no | String | https://storage-dag.iijgio.com | ストレージAPIのエンドポイント |
force_path_style | no | Boolean | false | |
debug | no | Boolean | false | デバッグモード |
サンプル¶
clientを生成
require 'dag'
client = Dag::Client.new("xxxxxxxxxx", "xxxxxxxxxxxxxxxxxxx")
オプションを設定する場合
require 'dag'
params = {
force_path_style: false,
debug: true
}
client = Dag::Client.new("xxxxxxxxxx", "xxxxxxxxxxxxxxxxxxx", params)
エラーレスポンス¶
名前 | 概要 |
---|---|
message | APIの実行時の情報 |
api_code | エラーコード |
api_message | エラーメッセージ |
api_status | エラーステータス |
api_request_id | エラー時のリクエストID |
api_resource | エラー時のパス情報 |
例外を捕捉するサンプル
require 'dag'
client = Dag::Client.new("xxxxxxxxxx", "xxxxxxxxxxxxxxxxxxx")
begin
# clustersの取得に失敗した場合の例外を捕捉する
client.clusters.each do |cluster|
puts cluster.name
end
rescue Dag::Client::APIFailure => e
puts e.message
puts e.api_code
puts e.api_message
puts e.api_status
puts e.api_request_id
puts e.api_resource
end
また、上記以外のエラーレスポンスには以下のものがあります。
名前 | 概要 |
---|---|
ParameterInvalid | 不正なパラメータが設定された |
APIOptionInvalid | 不正なAPIオプションが設定された |
StatusInvalid | クラスタのステータスが不正である |
JobTypeInvalid | ジョブ種別が不正である |
ClusterNotOpen | クラスタが指定されていない |
ClusterRebooted | クラスタが再起動された |
DatabaseNotFound | 指定されたデータベースが見つからない |
TableAlreadyExists | 作成しようとしたテーブルは既に存在している |
TableNotFound | 指定されたテーブルが見つからない |
機能一覧¶
クラスタ管理¶
機能名 | 概要 |
---|---|
クラスタ一覧取得 | 保有しているクラスタの一覧を取得します。 |
クラスタ情報取得 | 指定したクラスタの情報を取得します。 |
クラスタ統計情報取得 | 指定したクラスタの統計情報を取得します。 |
クラスタ再起動 | 指定したクラスタの再起動を行います。 |
クラスタログ出力 | 指定したクラスタのアプリケーションログをストレージへ出力します。 |
データベース管理¶
機能名 | 概要 |
---|---|
データベース一覧取得 | データベースの一覧を取得します。 |
データベース情報取得 | 指定したデータベースの情報を取得します。 |
データベース作成 | データベースを作成します。 |
データベース削除 | データベースを削除します。 |
テーブル管理¶
機能名 | 概要 |
---|---|
テーブル一覧取得 | テーブルの一覧を取得します。 |
テーブル情報取得 | 指定したテーブルの情報を取得します。 |
テーブル作成 | テーブルを作成します。 |
テーブル作成(分割) | オブジェクトを分割し、新たなテーブルを作成します。 |
テーブル情報更新 | テーブルの情報を更新します。 |
テーブル削除 | テーブルを削除します。 |
ジョブ管理¶
機能名 | 概要 |
---|---|
ジョブ一覧取得 | ジョブの一覧を取得します。 |
ジョブ情報取得 | 指定したジョブの情報を取得します。 |
ジョブ実行結果取得 | ジョブの実行結果を取得します。 |
ジョブ実行ログ取得 | ジョブの実行ログを取得します。 |
機能詳細¶
クラスタ管理¶
クラスタ一覧取得¶
# 全てのクラスタ一覧を取得
client.clusters.each do |cluster|
puts cluster.name
#=> "cluster01"
#=> "cluster02"
end
# 通常起動中のクラスタ一覧を取得
client.clusters.where(status: 'norm').each do |cluster|
puts cluster.name
end
# 複数のステータスを指定
client.clusters.where(status: ['norm', 'error']).each do |cluster|
puts cluster.name
end
# クラスタ名を指定
client.clusters.where(cluster_name: 'cluster').each do |cluster|
puts cluster.name
end
# クラスタタイプを指定
client.clusters.where(type: 'DAG5-Hive-Hadoop').each do |cluster|
puts cluster.name
end
クラスタ一覧を昇順、又は降順で取得します。
# 昇順
client.clusters.order(:asc).each do |cluster|
puts cluster.name
end
# 降順
client.clusters.order(:desc).each do |cluster|
puts cluster.name
end
クラスタ情報取得¶
クラスタ情報の詳細を取得します。
# クラスタを選択
client.open('cluster1')
# cluster1の情報を取得
cluster = client.cluster
puts cluster.name
#=> cluster1
puts cluster.status
#=> norm
puts cluster.type
#=> 'DAG5-Hive-Hadoop'
cluster.instances.each do |instance|
puts instance.grade
puts instance.quantity
#=> a-1-010
#=> 2
end
項目名 | 型 | 概要 |
---|---|---|
name | String | クラスタ名 |
status | String | ステータス |
type | String | DAG5-Hive-Hadoop |
instances | Array | インスタンス情報の配列 |
項目名 | 型 | 概要 |
---|---|---|
grade | String | グレード |
quantity | Integer | 個数 |
クラスタ統計情報取得¶
クラスタ統計情報を取得します。
# クラスタを選択
client.open('cluster1')
# cluster1の統計情報を取得
cluster_statistics = client.cluster.statistics
# 統計情報の中からインスタンスに関する情報を取得
cluster_statistics.instances.each do |instance|
puts instance.instance_id
puts instance.grade
puts instance.disk.dfs_used
puts instance.disk.non_dfs_used
puts instance.disk.capacity
#=> 1
#=> a-1-010
#=> 3522560
#=> 35790848
#=> 107321753600
end
# 統計情報の中からディスクに関する情報を取得
puts cluster_statistics.disk.capacity
puts cluster_statistics.disk.used
#=> 107321753600
#=> 39313408
項目名 | 型 | 概要 |
---|---|---|
instances | Array | インスタンス情報の配列 |
disk | OpenStruct | クラスタ全体のディスク情報 |
項目名 | 型 | 概要 |
---|---|---|
insntace_id | Integer | ID |
grade | String | グレード |
disk | OpenStruct | インスタンス毎のディスク情報 |
項目名 | 型 | 概要 |
---|---|---|
capacity | Integer | インスタンス毎のディスク許容量 |
dfs_used | Integer | 分散ファイルシステムのディスク使用量 |
non_dfs_used | Integer | 分散ファイルシステム以外のディスク使用量 |
項目名 | 型 | 概要 |
---|---|---|
capacity | Integer | クラスタ全体のディスク許容量 |
used | Integer | クラスタ全体のディスク使用量 |
クラスタ再起動¶
クラスタのステータスが、norm、failed、ptfailed の時のみクラスタの再起動が可能です。
# クラスタを選択
client.open('cluster1')
cluster = client.cluster
# 再起動
cluster.restart
# パラメータを指定して再起動
cluster.restart(type: 'DAG5-Hive-Hadoop', force: true)
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 |
---|---|---|---|---|
type | no | String | 現在のクラスタ種別 | 再起動後にクラスタタイプを変更したい場合に指定 |
force | no | Boolean | false | 強制再起動 |
クラスタログ出力¶
指定したクラスタのアプリケーションログをストレージへ出力します。
# クラスタを選択
client.open('cluster1')
cluster = client.cluster
# cluster1のログをストレージに出力
cluster.export_log(output_log_path: 'dag://bucket/object_path/', compress: true)
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 |
---|---|---|---|---|
output_log_path | yes | String | 出力先のストレージパスを指定 | |
compress | no | Boolean | false | 圧縮して出力するか指定 |
データベース管理¶
データベース一覧取得¶
# クラスタを選択
client.open('cluster1')
client.databases.each do |db|
puts db.name
#=> db1
#=> db2
end
データベース情報取得¶
データベースの情報を取得します。
# クラスタを選択
client.open('cluster1')
# データベースを選択
db = client.database('db1')
puts db.name
#=> db1
puts db.db_name
#=> db1
puts db.cluster_name
#=> cluster1
# 詳しくはテーブル管理を参照
db.table
#=> Table
項目名 | 型 | 概要 |
---|---|---|
name | String | データベース名 |
db_name | String | name のエイリアス |
cluster_name | String | クラスタ名 |
table | TableCollection | テーブル一覧取得を参照 |
データベース作成¶
指定したクラスタにデータベースを作成します。
# クラスタを選択
client.open('cluster1')
# データベースを作成
db1 = client.databases.create('db1')
db2 = client.create_database('db2')
puts db1.name
#=> db1
puts db2.name
#=> db2
データベース削除¶
データベースを削除します。
# クラスタを選択
client.open('cluster1')
# db1を削除
db = client.database('db1')
db.delete
テーブル管理¶
テーブル一覧取得¶
取得可能なテーブル情報の詳細については、「テーブル情報取得」を参照してください。
# クラスタを選択
client.open('cluster1')
# 全てのデータベースに存在するテーブル情報を表示
client.databases.each do |db|
db.tables.each do |table|
puts table.name
#=> table1
#=> table2
end
end
テーブル情報取得¶
テーブルの詳細情報を取得します。
# クラスタを選択
client.open('cluster1')
# DBを選択
db = client.database('db1')
# テーブルを選択
table = db.table('table1')
puts table.name
#=> table1
puts table.format
#=> json
puts table.schema
#=> v map<string, string>
puts table.comment
#=> sample table
puts table.location
#=> dag://db1/table1
puts table.created_at
#=> 2015-03-02 14:48:57 +0900
puts table.modified_at
#=> 2015-03-02 14:48:57 +0900
puts table.cluster_name
#=> cluster1
puts table.db_name
#=> db1
項目名 | 型 | 概要 |
---|---|---|
name | String | テーブル名 |
format | String | フォーマット |
schema | String | スキーマ |
comment | String | コメント |
location | String | ロケーション |
created_at | Time | 作成日時 |
modified_at | Time | 更新日時 |
cluster_name | String | クラスタ名 |
db_name | String | データベース名 |
テーブル作成¶
テーブルを作成します。
# クラスタを選択
client.open('cluster1')
# データベースを選択
db = client.database('db1')
params = {
table: 'table1',
format: 'csv',
comment: 'test comment',
schema: 'v array<string>'
}
# テーブルを作成
db.tables.create(params)
もしくは以下の方法でもテーブルを作成できます。
# clientから作成する場合
params = {
format: 'csv',
comment: 'test comment',
schema: 'v array<string>'
}
# テーブルを作成
client.create_table('db1', 'table1', params)
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 | 備考 |
---|---|---|---|---|---|
table | yes | String | テーブル名 | ||
format | no | String | json_agent | csv, tsv, json, json_agent の何れか | |
schema | no | String | formatの値に対応したデフォルトのスキーマ | スキーマ | |
comment | no | String | コメント |
テーブル作成(分割)¶
# クラスタを選択
client.open('cluster1')
params = {
input_object_keys: ['dag://bucket/object1', 'dag://bucket/object2'],
input_format: 'csv',
label: 'label',
}
# テーブル分割
client.split_table('cluster1', 'db1', 'table1', params)
設定可能なパラメータに関しては以下の表をご覧ください。
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 |
---|---|---|---|---|
input_object_keys | yes | Array | バケット名/オブジェクト名の配列 | |
input_format | no | String | json_agent | csv, tsv, json, json_agent の何れか |
schema | no | String | formatの値に対応したデフォルトのスキーマ | スキーマ |
label | no | String | split job に対するラベル |
テーブル情報更新¶
# クラスタを選択
client.open('cluster1')
# テーブルを選択
table = client.database('db1').table('table1')
params = {
comment: 'update comment',
format: 'update format',
schema: 'update schema'
}
# テーブル情報を更新
table.update(params)
テーブル削除¶
テーブルを削除します。
# クラスタを選択
client.open('cluster1')
# テーブルを削除
table = client.database('db1').table('table1')
table.delete
ジョブ管理¶
ジョブ一覧取得¶
client.jobs.each do |job|
puts job.status
end
ジョブ一覧に検索条件を加えた場合
# ステータスが実行中のものを取得
client.jobs.where(status: 'running').each do |job|
puts job.id
end
# ステータスがキャンセル、エラーのものを取得
client.jobs.where(status: ['canceled', 'error']).each do |job|
puts job.id
end
# タイプがselectのものを取得
client.jobs.where(type: 'select').each do |job|
puts job.id
end
# 前方一致でクラスタ名が一致するものを取得
client.jobs.where(cluster_name: 'cluster1').each do |job|
puts job.id
end
# 前方一致でラベルが一致するものを取得
client.jobs.where(label: 'label1').each do |job|
puts job.id
end
# クラスタが再起動済みのクラスタを返す
client.jobs.where(cluster_rebooted: true).each do |job|
puts job.id
end
ジョブの一覧を昇順または降順で取得する場合
# 昇順
client.jobs.order(:asc).each do |job|
puts job.id
end
# 降順
client.jobs.order(:desc).each do |job|
puts job.id
end
ジョブの表示数を絞る場合
# 100個取得
# 引数が与えられない場合、デフォルト値として100個取得する
client.jobs.limit.each do |job|
puts job.id
end
# 昇順で10個取得
client.jobs.order(:asc).limit(10).each do |job|
puts job.id
end
# ステータスがfinishedのものを10個取得
client.jobs.where(status: 'running').limit(10).each do |job|
puts job.id
end
whereメソッドに以下のパラメータを設定することでオブジェクトを絞り込んで取得できます。
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 |
---|---|---|---|---|
status | no | String or Array | ジョブのステータスを指定 | |
type | no | String | ジョブのタイプを指定 | |
cluster_name | no | String | ジョブが実行されたクラスタ名を指定 | |
label | no | String | ジョブのラベルを指定 | |
cluster_rebooted | no | Boolean | ジョブの実行後にクラスタが再起動されたか指定 |
ジョブ情報取得¶
ジョブの詳細情報を取得します。
# ジョブを選択
job = client.job(1038)
puts job.id
#=> 1038
puts job.type
#=> select
puts job.process_engine
#=> mapreduce
puts job.dsl
#=> hive
puts job.status
#=> finished
puts job.cluster
#=> mmoge
puts job.start_at
#=> 2015-03-02 12:35:50 +9000
puts job.access_key_id
#=> xxxxxxxxxxxxxxxxx
puts job.label
#=> test label
puts job.progress
#=> nil
puts job.finished?
#=> true(or false)
job.reload
ジョブのtypeにより、取得可能なジョブ情報が異なります。
項目名 | 型 | 概要 |
---|---|---|
id | Integer | ジョブID |
type | String | select または split |
process_engine | String | 実行エンジン |
dsl | String | hive または mapreduce |
status | String | ジョブステータス |
cluster | String | クラスタ名 |
start_at | Time | 開始時間 |
access_key_id | String | アクセスキー |
label | String | ラベル |
progress | String | 進捗率(ex. ‘70%’) |
finished? | Boolean | ジョブが終了しているか |
reload | Nil | ジョブ情報更新 |
項目名 | 型 | 概要 |
---|---|---|
stage | Integer | ステージ |
query | String | 実行クエリ |
output_format | String | 出力フォーマット(csv またはtsv) |
output_object | String | 出力オブジェクト |
項目名 | 型 | 概要 |
---|---|---|
job_id | String | ジョブID |
schema | String | スキーマ情報 |
input_object_keys | Array | オブジェクト情報の配列 |
input_format | String | 入力フォーマット(csvまたはtsv) |
output_bucket | String | 出力先バケット名 |
ouput_table | String | 出力先テーブル名 |
ジョブ実行結果取得¶
# ジョブを選択
job = client.job(1038)
# ジョブの結果ダウンロードリンクを生成
puts job.download_urls
#=> https://bucket1.storage-dag.iijgio.com/object1/.resultsetmetadata?Expires=.....
#=> https://bucket1.storage-dag.iijgio.com/object1/000000_0?Expires=.....
# ダウンロード可能時間を5分に変更
puts job.download_urls(5) # 分で指定
#=> https://bucket1.storage-dag.iijgio.com/object1/.resultsetmetadata?Expires=.....
#=> https://bucket1.storage-dag.iijgio.com/object1/000000_0?Expires=.....
ジョブ実行ログ取得¶
ジョブ実行後にクラスタを再起動した場合、ログを取得することはできません。
# ジョブを選択
job = client.job(1038)
# ジョブのログを取得
puts job.log
もしくは以下の方法でログ情報を取得できます。
# ジョブのログを取得
puts client.job_log(1038)
ストレージ管理¶
ストレージの操作を行うことができます。
バケット一覧取得¶
バケット一覧を取得します。
client.buckets.each do |bucket|
puts bucket.name
#=> bucket1
#=> bucket2
end
バケット作成¶
バケットを作成します。
# バケットを作成
bucket = client.buckets.create('bucket')
puts bucket.name
#=> bucket
以下の方法で作成することもできます。
# バケットを作成
bucket = client.create_bucket('bucket')
puts bucket.name
#=> bucket
バケット削除¶
バケットを削除します。
# バケットを選択
bucket = client.buckets['bucket']
# バケットを削除
bucket.delete
以下の方法で削除することもできます。
# バケットを削除
client.delete_bucket('bucket')
オブジェクト一覧取得¶
以下のようにしてオブジェクトの一覧を取得可能です。
# バケットを選択
bucket = client.buckets['bucket']
# オブジェクト一覧を取得
bucket.objects.each do |object|
puts object.name
#=> object1/
#=> object1/test1
#=> object2/
end
# 接頭辞を指定して検索することが可能
bucket.objects.where(prefix: 'object1').each do |object|
puts object.name
#=> object1/
#=> object1/test1
end
delimiterを指定した場合の例
# ストレージ上に存在するデータ
# /foo/photo/2009/index.html
# /foo/photo/2009/12/index.html
# /foo/photo/2010/index.html
# /foo/photo/2010/xmas.jpg
# /foo/photo/2010/01/index.html
# /foo/photo/2010/01/xmas.jpg
# /foo/photo/2010/02/index.html
# バケットを選択
bucket = client.buckets['bucket']
# 以下の条件でdelimiterを指定した場合
bucket.objects.where(prefix: '/foo/photo/2010', delimiter: '/').each do |object|
puts object.name
#=> /foo/photo/2010/index.html
#=> /foo/photo/2010/xmas.jpg
#=> /foo/photo/2010/01
#=> /foo/photo/2010/02
end
whereメソッドの以下のパラメータを設定することでオブジェクトを絞り込んで取得できます。
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 |
---|---|---|---|---|
prefix | no | String | 前方一致でフィルタするための文字列 | |
delimiter | no | String | オブジェクト階層の区切り文字 |
オブジェクト取得¶
オブジェクトを取得します。
# バケットを選択
bucket = client.buckets['bucket']
# オブジェクトを選択
object = bucket.objects['object1/test.gz']
puts object.name
#=> object1/test.gz
# オブジェクトからデータを読み込む
puts object.read
#=> "abcdefg"
# オブジェクトから3byte分データを読み込む
puts object.read(3)
#=> "abc"
オブジェクト作成¶
# バケットを選択
bucket = client.buckets['bucket']
# オブジェクトを選択
object = bucket.objects['object1/test.json']
# 文字列
object.write('testobject')
# パス
object.write(Pathname.new('./object_1.json'))
マルチパートアップロードでオブジェクトを作成する
# バケットを選択
bucket = client.buckets['bucket']
# オブジェクトを選択
object = bucket.objects['object1/test.json']
# マルチパートアップロードでオブジェクトを作成する
object.write(Pathname.new('./object_1.json'), multipart: true, jobs: 10, splitsz: 10_000_000)
マルチパートアップロードを使用する場合は以下のオプションを指定することができます。
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 |
---|---|---|---|---|
multipart | no | Boolean | false | マルチパートアップロードを有効にする |
jobs | no | String | 1 | マルチパートアップロードの並列アップロード数 |
splitsz | no | Integer | 104857600 | オブジェクト分割サイズ。5MBバイト以上を指定 |
オブジェクト削除¶
オブジェクトを削除します。
# バケットを選択
bucket = client.buckets['bucket']
# オブジェクトを選択
object = bucket.objects['object1/test.json']
# オブジェクトを削除
object.delete
以下の方法で削除することもできます。
# オブジェクトを削除
client.delete_object('bucket', 'object_1/test.json')
データインポート¶
インポート可能なデータの形式例
{"key1":"value1", "key2":"value2"}
{"key3":"value3", "key4":"value4"}
.
.
.
# インポート対象のデータベース作成
db = client.databases.create('database1')
# インポート対象のテーブル作成
db.tables.create(table: 'table1', format: 'json')
# インポートしたいファイル
files = ['log/data1.log.gz', 'log/data2'log.gz']
# インポート時のパラメータ
params = { label: "importdata", jobs: 3 }
# 複数のファイルをdatabase1/table1にインポートする
client.import('database1', 'table1', files, params)
#=> Initialize...
#=> jobs: 3, splitsz: 104857600
#=> > starting upload part 1, 189
#=> < finished upload part 1, 189
#=> > starting upload part 2, 954
#=> < finished upload part 2, 954
#=> finished upload 2 objects.
#=> upload_objects:
#=> /database1/table1/importdata_0.gz
#=> /database1/table1/importdata_1.gz
インポートには以下のパラメータを設定できます。
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 |
---|---|---|---|---|
label | no | String | label | インポートした後のファイル名 |
jobs | no | Integer | 1 | 並列実行するスレッド数 |
splitsz | no | Integer | 104857600 | ファイルの分割サイズ |
クエリ¶
HiveQLを発行します。
クエリ発行¶
params = {
query: "SELECT COUNT(1) FROM db.table",
output_format: 'csv',
output_object: 'dag://bucket1/output_object1',
label: 'label1'
}
# クエリを発行するとジョブになる
job = client.query(params)
パラメータ名 | 必須 | 型 | デフォルト値 | 概要 | 備考 |
---|---|---|---|---|---|
query | yes | String | 実行対象のクエリ | 先頭がSELECT文以外、またはクエリ中に”OVERWRITE”が含まれていた場合はエラーとなります。 | |
output_format | no | String | csv | csv または tsv | |
output_resource_path | yes | String | 出力オブジェクトのパス /#{バケット名}/#{オブジェクト名}/ | ||
label | no | String | ラベル |