yy16ki’s diary


スポンサードリンク

Talend StudioでS3からの複数ファイル読み込み・統合

Talend StudioでS3上にある,同スキーマではあるものの,ファイル自体が分割されたデータを統合する必要があったのでその際のメモになります.

前提条件

「Talend Open Studio for Data Integration」の「Version 8.0.1」で検証を実施しました.

作成したジョブ

今回作成したジョブは下図の通りです.

f:id:yy16ki:20220318235026p:plain
ジョブ

サンプルデータ

検証用に,下記のようなデータを用意しました.
システムから年月別に売上データをExcel形式で出力するようなイメージです.

f:id:yy16ki:20220318235415p:plain
サンプルデータ
このようなファイルがS3にアップロードされているとして,Talendから取得し加工できるように統合します.サンプルデータは,事前にS3へアップロードしておきます.キープレフィックスは「talendinput/」としています.
f:id:yy16ki:20220318235908p:plain
S3へアップロード

ジョブの構成

S3への接続

はじめに,PrejobとしてS3との接続を確立します.S3との接続には「tS3Connection」コンポーネントを使用します.
他にもS3関連のコンポーネントを使用するので,はじめに接続情報をもっておくと,後工程で使いまわすことができて便利です.

f:id:yy16ki:20220319001431p:plain
Prejob
「tS3Connection」コンポーネントの設定は下記の通りです.
個人の検証用なので,アクセスキー・シークレットアクセスキーを指定する方法としています.また,いずれもコンテキストを使用しています.
f:id:yy16ki:20220319002157p:plain
tS3Connection:設定

S3からのファイル取得

次に,S3からファイルを取得します.
「特定階層下のファイル名一覧の取得」→「取得した名称でファイルを指定しローカルへget」という手順になります.

f:id:yy16ki:20220319002555p:plain
S3からファイル取得
ファイル名一覧の取得には,「tS3List」を使用します.設定は下記の通りです.「tS3Connection」の接続情報を指定し,バケット・キープレフィックスをコンテキストで指定しています.
f:id:yy16ki:20220319002949p:plain
tS3List:設定
ファイルのローカルへの取得には,「tS3Get」を使用します.
f:id:yy16ki:20220319003411p:plain
tS3Get:設定
ここで,設定のバケット・キー・ファイルは下記の通り前の「tS3List」由来の変数として指定します.

Bucket:((String)globalMap.get("tS3List_1_CURRENT_BUCKET"))
Key:((String)globalMap.get("tS3List_1_CURRENT_KEY"))
File:context.FOLDER_PATH+((String)globalMap.get("tS3List_1_CURRENT_KEY")).replaceAll(context.KEY_PREFIX,"")

Talendでは,globalMapオブジェクトという形で,コンポーネントの情報を格納しており,ジョブの中で参照することが可能です.

www.talendbyexample.com

ジョブで使用しているコンポーネントのglobalMapオブジェクトはTalend Studio画面左下の「Outline」より参照可能です.「tS3List」の場合,バケット名・キープレフィックス等の情報が参照可能なことがわかります.

f:id:yy16ki:20220319010234p:plain
Outline:tS3List

試しに下記のようなフローを作成し,「tS3List_X_CURRENT_KEY」の中身を見てみます.
tS3Listの出力はiterateなので,「tlogrow」で閲覧するために「tIterateToflow」をはさんでいます.
※ここでは「tS3_List 2 CURRENT_KEY」となっていることに注意してください.

f:id:yy16ki:20220319010649p:plain
tS3_List_CURRENT_KEYの表示

f:id:yy16ki:20220321111351p:plain
tIterateToFlow:設定

今回の場合,出力は以下の通りです.「talendinput/」配下に目的のファイル名があることがわかります.「tS3Get」では,.replaceAll()メソッドを使用し,このキープレフィックス空欄に置き換えることで,ファイル名のみを取得し,指定のフォルダに出力しています.

f:id:yy16ki:20220319011011p:plain
tS3_List_CURRENT_KEY

「tS3Get」ファイル設定

context.FOLDER_PATH+((String)globalMap.get("tS3List_1_CURRENT_KEY")).replaceAll(context.KEY_PREFIX,"")

取得したファイルの統合(ユニオン)

「tS3Get」で出力したファイルを読み込み,統合(ユニオン)します.「対象のファイル名一覧の取得→読み込み→ユニオン」の流れです.

f:id:yy16ki:20220319012023p:plain
sampale

ファイル名の取得には,「tFile_List」を使用します.Directoryに先程の手順でファイルを出力したフォルダを指定します.今回は指定フォルダのすべてのファイルが対象なので特に指定はありませんが,特定の拡張子を持つファイルなど,対象を絞り込むことができます.

f:id:yy16ki:20220321111116p:plain
tFile_List:設定

出力したファイルは,「tFileInputExcel」で読み込みます.「tS3Get」の時と同様にファイル名は,「tFile_List」のglobalMapオブジェクトである((String)globalMap.get("tFileList_1_CURRENT_FILEPATH"))を指定します.

f:id:yy16ki:20220321111155p:plain
tFileInputExcel:設定

データの統合には,「tUnite」コンポーネントを使用します.このコンポーネントについては,下記の記事でわかりやすく取り上げられています.

dev.classmethod.jp

最後に,今回はtLogRowで出力します.出力結果は下記のようになりました.

f:id:yy16ki:20220321111242p:plain
出力

S3との接続を閉じる

Postjobとして,明示的にS3との接続を閉じておきます.

f:id:yy16ki:20220321123907p:plain
PostJob

まとめ

TalendでS3→ローカルへのファイル展開・統合処理を実施してみました.GUIでこういった処理を機械的に実行できるのは便利だなと感じました.今回は以上になります.


スポンサードリンク