Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)は、StepExecutionListenerインタフェースのafterStepメソッドやbeforeStepメソッドを利用することで、処理単位(Tasklet)間でデータの受け渡しを行うことができる。
今回は、処理単位(Tasklet)間でデータの受け渡しを行ってみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
また、以下の記事のPostmanをインストール済であること。
作成したサンプルプログラムの修正
前提条件の記事のサンプルプログラムに、処理単位(Tasklet)間でデータの受け渡し処理を追加する。なお、下記の赤枠は、前提条件のプログラムから追加・変更したプログラムである。

新設したユーティリティクラスの内容は以下の通りで、ジョブパラメータからEventGridTriggerParamオブジェクトを生成している。
package com.example.batch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import com.example.model.EventGridTriggerParam;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
public class DemoTaskletUtil {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER
= LoggerFactory.getLogger(DemoTaskletUtil.class);
/**
* Spring Batchのジョブで渡されたパラメータから、
* EventGridTriggerParamオブジェクトを生成する.
* @param stepExecution ジョブステップ実行定義
* @return EventGridTriggerParamオブジェクト
*/
public static EventGridTriggerParam getEventGridTriggerParam(
StepExecution stepExecution) {
// Spring Batchのジョブで渡されたパラメータを取得し、
// EventGridTriggerParamオブジェクトを生成する
String paramStr = stepExecution.getJobParameters()
.getString("eventGridTriggerParam");
EventGridTriggerParam param = null;
if (paramStr != null) {
try {
param = new ObjectMapper().readValue(paramStr
, new TypeReference<EventGridTriggerParam>() { });
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
throw new RuntimeException(ex);
}
}
return param;
}
}DemoTasklet.javaの内容は以下の通りで、StepExecutionListenerインタフェースを実装し、afterStepメソッド内で次のTaskletにデータ(inputFileName)を渡すようにしている。
package com.example.batch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.example.model.EventGridTriggerParam;
import com.example.service.DemoBatchService;
import org.springframework.batch.core.step.tasklet.Tasklet;
@Component
public class DemoTasklet implements Tasklet, StepExecutionListener {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER
= LoggerFactory.getLogger(DemoTasklet.class);
/**
* BlobStorageからファイル(user_data.csv)を読み込み、
* USER_DATAテーブルに書き込むサービス
*/
@Autowired
private DemoBatchService demoBatchService;
/**
* Spring Batchのジョブ実行前の処理を定義する.
*/
@Override
public void beforeStep(StepExecution stepExecution) {
// Spring Batchのジョブで渡されたパラメータを取得し、
// 処理が呼び出されたことをログ出力する
EventGridTriggerParam param
= DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
LOGGER.info("DemoTasklet beforeStep " + " triggered: " + param.getTimerInfo());
}
/**
* Spring Batchのジョブ内での処理を定義する.
*/
@Override
public RepeatStatus execute(StepContribution contribution
, ChunkContext chunkContext) throws Exception {
// Spring Batchのジョブで渡されたパラメータを取得し、
// 処理が呼び出されたことをログ出力する
EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(
chunkContext.getStepContext().getStepExecution());
LOGGER.info("DemoTasklet execute " + " triggered: " + param.getTimerInfo());
// BlobStorageから指定したファイルを読み込み、
// USER_DATAテーブルに書き込むサービスを呼び出す
demoBatchService.readUserData(param.getFileName());
// ジョブが終了したことを返す
return RepeatStatus.FINISHED;
}
/**
* Spring Batchのジョブ実行後の処理を定義する.
*/
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// Spring Batchのジョブで渡されたパラメータを取得し、
// 処理が呼び出されたことをログ出力する
EventGridTriggerParam param
= DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
// Spring Batchのジョブ内での処理が呼び出されたことをログ出力する
LOGGER.info("DemoTasklet afterStep " + " triggered: " + param.getTimerInfo());
// ファイル名を次のタスクレットに渡し、実行後ジョブが終了したことを返す
stepExecution.getJobExecution().getExecutionContext()
.put("inputFileName", param.getFileName());
return ExitStatus.COMPLETED;
}
}DemoTaskletLog.javaの内容は以下の通りで、StepExecutionListenerインタフェースを実装し、beforeStepメソッド内で前のTaskletからのデータ(inputFileName)を受け取るようにしている。
package com.example.batch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;
import com.example.model.EventGridTriggerParam;
@Component
public class DemoTaskletLog implements Tasklet, StepExecutionListener {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER = LoggerFactory.getLogger(DemoTaskletLog.class);
/**
* Spring Batchのジョブ実行前の処理を定義する.
*/
@Override
public void beforeStep(StepExecution stepExecution) {
// Spring Batchのジョブで渡されたパラメータを取得し、
// 処理が呼び出されたことをログ出力する
EventGridTriggerParam param
= DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
LOGGER.info("DemoTaskletLog beforeStep " + " triggered: " + param.getTimerInfo());
// タスクレットに渡されたファイル名を取得する
String inputFileName = (String) stepExecution.getJobExecution()
.getExecutionContext().get("inputFileName");
LOGGER.info("inputFileName: " + inputFileName);
}
/**
* Spring Batchのジョブ内でのログ出力処理を定義する.
*/
@Override
public RepeatStatus execute(StepContribution contribution
, ChunkContext chunkContext) throws Exception {
// Spring Batchのジョブで渡されたパラメータを取得し、
// 処理が呼び出されたことをログ出力する
EventGridTriggerParam param = DemoTaskletUtil
.getEventGridTriggerParam(chunkContext.getStepContext().getStepExecution());
LOGGER.info("DemoTaskletLog execute " + " triggered: " + param.getTimerInfo());
// ジョブが終了したことを返す
return RepeatStatus.FINISHED;
}
/**
* Spring Batchのジョブ実行後の処理を定義する.
*/
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// Spring Batchのジョブで渡されたパラメータを取得し、
// 処理が呼び出されたことをログ出力する
EventGridTriggerParam param
= DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
LOGGER.info("DemoTaskletLog afterStep " + " triggered: " + param.getTimerInfo());
// 実行後ジョブが終了したことを返す
return ExitStatus.COMPLETED;
}
}その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-tasklet-put-data/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、以下の通り。
1) 以下の記事の「EventGridのローカル環境での実行」に記載した通りに実行する。
実行した結果のログは以下の通りで、DemoTaskletLogクラスのinputFileNameに、取り込んだCSVファイル名「user_data.csv」が設定されていることが確認できる。

2) それぞれ500件のデータを含む取込ファイル「user_data_1.csv」「user_data_2.csv」を用意し、1)と同じ方法で、2ファイルを並行して(2スレッドが起動した状態で)取り込んだ結果のログは以下の通りで、DemoTaskletLogクラスのinputFileNameに、取り込んだCSVファイル名「user_data_1.csv」「user_data_2.csv」がそれぞれ設定されていることが確認できる。

なお、500件のデータは、以下の記事のプログラムを利用して作成している。
要点まとめ
- Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)は、StepExecutionListenerインタフェースのafterStepメソッドやbeforeStepメソッドを利用することで、処理単位(Tasklet)間でデータの受け渡しを行うことができる。






