Talend StudioでS3上にある,同スキーマではあるものの,ファイル自体が分割されたデータを統合する必要があったのでその際のメモになります.
前提条件
「Talend Open Studio for Data Integration」の「Version 8.0.1」で検証を実施しました.
作成したジョブ
今回作成したジョブは下図の通りです.
サンプルデータ
検証用に,下記のようなデータを用意しました.
システムから年月別に売上データをExcel形式で出力するようなイメージです.
このようなファイルがS3にアップロードされているとして,Talendから取得し加工できるように統合します.サンプルデータは,事前にS3へアップロードしておきます.キープレフィックスは「talendinput/」としています.
ジョブの構成
S3への接続
はじめに,PrejobとしてS3との接続を確立します.S3との接続には「tS3Connection」コンポーネントを使用します.
他にもS3関連のコンポーネントを使用するので,はじめに接続情報をもっておくと,後工程で使いまわすことができて便利です.
「tS3Connection」コンポーネントの設定は下記の通りです.
個人の検証用なので,アクセスキー・シークレットアクセスキーを指定する方法としています.また,いずれもコンテキストを使用しています.
S3からのファイル取得
次に,S3からファイルを取得します.
「特定階層下のファイル名一覧の取得」→「取得した名称でファイルを指定しローカルへget」という手順になります.
ファイル名一覧の取得には,「tS3List」を使用します.設定は下記の通りです.「tS3Connection」の接続情報を指定し,バケット・キープレフィックスをコンテキストで指定しています.
ファイルのローカルへの取得には,「tS3Get」を使用します.
ここで,設定のバケット・キー・ファイルは下記の通り前の「tS3List」由来の変数として指定します.
Bucket:((String)globalMap.get("tS3List_1_CURRENT_BUCKET"))
Key:((String)globalMap.get("tS3List_1_CURRENT_KEY"))
File:context.FOLDER_PATH+((String)globalMap.get("tS3List_1_CURRENT_KEY")).replaceAll(context.KEY_PREFIX,"")
Talendでは,globalMapオブジェクトという形で,コンポーネントの情報を格納しており,ジョブの中で参照することが可能です.
ジョブで使用しているコンポーネントのglobalMapオブジェクトはTalend Studio画面左下の「Outline」より参照可能です.「tS3List」の場合,バケット名・キープレフィックス等の情報が参照可能なことがわかります.
試しに下記のようなフローを作成し,「tS3List_X_CURRENT_KEY」の中身を見てみます.
tS3Listの出力はiterateなので,「tlogrow」で閲覧するために「tIterateToflow」をはさんでいます.
※ここでは「tS3_List 2 CURRENT_KEY」となっていることに注意してください.
今回の場合,出力は以下の通りです.「talendinput/」配下に目的のファイル名があることがわかります.「tS3Get」では,.replaceAll()
メソッドを使用し,このキープレフィックス空欄に置き換えることで,ファイル名のみを取得し,指定のフォルダに出力しています.
「tS3Get」ファイル設定
context.FOLDER_PATH+((String)globalMap.get("tS3List_1_CURRENT_KEY")).replaceAll(context.KEY_PREFIX,"")
取得したファイルの統合(ユニオン)
「tS3Get」で出力したファイルを読み込み,統合(ユニオン)します.「対象のファイル名一覧の取得→読み込み→ユニオン」の流れです.
ファイル名の取得には,「tFile_List」を使用します.Directoryに先程の手順でファイルを出力したフォルダを指定します.今回は指定フォルダのすべてのファイルが対象なので特に指定はありませんが,特定の拡張子を持つファイルなど,対象を絞り込むことができます.
出力したファイルは,「tFileInputExcel」で読み込みます.「tS3Get」の時と同様にファイル名は,「tFile_List」のglobalMapオブジェクトである((String)globalMap.get("tFileList_1_CURRENT_FILEPATH"))
を指定します.
データの統合には,「tUnite」コンポーネントを使用します.このコンポーネントについては,下記の記事でわかりやすく取り上げられています.
最後に,今回はtLogRowで出力します.出力結果は下記のようになりました.
S3との接続を閉じる
Postjobとして,明示的にS3との接続を閉じておきます.
まとめ
TalendでS3→ローカルへのファイル展開・統合処理を実施してみました.GUIでこういった処理を機械的に実行できるのは便利だなと感じました.今回は以上になります.