これまでは、Spring BatchのTaskletモデルを利用してきたが、ファイルの読み込み/データの加工/DBへの書き込みといった処理の流れを定型化したChunkモデルという方式もある。
Chunkモデルの処理方式については、以下のサイトを参照のこと。
https://spring.pleiades.io/spring-batch/docs/current/reference/html/step.html#chunkOrientedProcessing
今回は、以前作成したプログラムをChunkモデルを利用する方式に変えてみたので、そのサンプルプログラムを共有する。
なお、今回はAzure Blob Storageのファイル読み込み時にSASトークンを発行しないと、ファイルの読み込みを行えないため、以下のサイトに記載されている、SASトークンの生成処理を利用している。
https://logico-jp.io/2021/01/21/create-a-service-sas-for-a-container-or-blob-storage-in-java/
前提条件
下記記事のサンプルプログラムを作成済であること。
また、読み込むCSVファイルの文字コードはUTF-8であるものとする。
作成したサンプルプログラムの修正
前提条件の記事のサンプルプログラムを、Chunkモデルを利用するように修正する。なお、下記の赤枠は、前提条件のプログラムから変更したり、追加したりしたプログラムである。

Chunkモデルの設定を行うクラスの内容は以下の通りで、ファイルの読み込み・DBへの書き込みや、バッチ処理単位(ステップ)やジョブの定義を行っている。
package com.example.batch;
import java.net.MalformedURLException;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.UrlResource;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.example.mybatis.model.UserData;
import com.example.util.DemoStringUtil;
import lombok.RequiredArgsConstructor;
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {
public final JobBuilderFactory jobBuilderFactory;
public final StepBuilderFactory stepBuilderFactory;
public final SqlSessionFactory sqlSessionFactory;
private final DemoUpdateProcessor demoUpdateProcessor;
/** Azure Storageのアカウント名 */
@Value("${azure.storage.accountName}")
private String storageAccountName;
/** Azure Storageへのアクセスキー */
@Value("${azure.storage.accessKey}")
private String storageAccessKey;
/** Azure StorageのBlobコンテナー名 */
@Value("${azure.storage.containerName}")
private String storageContainerName;
/**
* BlobStorageからファイルを読み込む.
* @return 読み込みオブジェクト
*/
@Bean
public FlatFileItemReader<UserData> reader() {
FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>();
try {
// Blobストレージへの接続文字列
String storageConnectionString = "DefaultEndpointsProtocol=https;"
+ "AccountName=" + storageAccountName
+ ";" + "AccountKey=" + storageAccessKey + ";";
// Blobサービスクライアントの生成
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
.connectionString(storageConnectionString).buildClient();
// BlobサービスクライアントからSASを生成
String sas = DemoStringUtil.getServiceSasUriForBlob(blobServiceClient);
// BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定
String url = blobServiceClient.getAccountUrl() + "/"
+ storageContainerName + "/user_data.csv"
+ "?" + sas;
reader.setResource(new UrlResource(url));
} catch (MalformedURLException ex) {
throw new RuntimeException(ex);
}
// ファイルを読み込む際の文字コード
reader.setEncoding("UTF-8");
// 1行目を読み飛ばす
reader.setLinesToSkip(1);
// ファイルから読み込んだデータをUserDataオブジェクトに格納
reader.setLineMapper(new DefaultLineMapper<UserData>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "id", "name", "birth_year"
, "birth_month", "birth_day", "sex", "memo" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() {
{
setTargetType(UserData.class);
}
});
}
});
return reader;
}
/**
* 読み込んだファイルのデータを、DBに書き込む.
* @param dataSource データソース
* @return 書き込みオブジェクト
*/
@Bean
public MyBatisBatchItemWriter<UserData> writer(DataSource dataSource) {
return new MyBatisBatchItemWriterBuilder<UserData>()
.sqlSessionFactory(sqlSessionFactory)
.statementId("com.example.mybatis.UserDataMapper.upsert")
.build();
}
/**
* Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
* @param reader 読み込みオブジェクト
* @param writer 書き込みオブジェクト
* @return Spring Batchのジョブ内で指定する処理単位
*/
@Bean
public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) {
// 生成するステップ内で読み込み/加工/書き込みを一連の流れを指定する
// その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している
return stepBuilderFactory.get("step")
.<UserData, UserData>chunk(3)
.reader(reader)
.processor(demoUpdateProcessor)
.writer(writer)
.build();
}
/**
* Spring Batchのジョブを定義する.
* @param jobListener 実行前後の処理(リスナ)
* @param step 実行するステップ
* @return Spring Batchのジョブ
*/
@Bean
public Job updateUserData(DemoJobListener jobListener, Step step) {
// 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
return jobBuilderFactory
.get("updateUserData")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.flow(step)
.end()
.build();
}
}また、読み込んだデータのチェックを行うクラスの内容は以下の通りで、エラー時はログ出力し、(DBへの書き込みをしないよう)NULLを返却するようにしている。
package com.example.batch;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import com.example.mybatis.model.UserData;
import com.example.util.DemoStringUtil;
@Component
public class DemoUpdateProcessor implements ItemProcessor<UserData, UserData> {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER
= LoggerFactory.getLogger(DemoUpdateProcessor.class);
/**
* 読み込んだデータの加工を行う.
* ここでは、読み込んだデータのチェックを行い、エラーがあればNULLを、
* エラーがなければ引数の値をそのまま返す.
*/
@Override
public UserData process(UserData item) throws Exception {
String itemId = item.getId();
// 1列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(itemId)) {
LOGGER.info("1列目が空またはNULLです。");
return null;
}
// 1列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(itemId)) {
LOGGER.info("1列目が数値以外です。" + " 該当のid=" + itemId);
return null;
}
// 1列目の桁数が不正な場合はエラー
if (itemId.length() > 6) {
LOGGER.info("1列目の桁数が不正です。" + " 該当のid=" + itemId);
return null;
}
// 2列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(item.getName())) {
LOGGER.info("2列目が空またはNULLです。" + " 該当のid=" + itemId);
return null;
}
// 2列目の桁数が不正な場合はエラー
if (item.getName().length() > 40) {
LOGGER.info("2列目の桁数が不正です。" + " 該当のid=" + itemId);
return null;
}
// 3列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(item.getBirth_year())) {
LOGGER.info("3列目が空またはNULLです。" + " 該当のid=" + itemId);
return null;
}
// 3列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(item.getBirth_year())) {
LOGGER.info("3列目が数値以外です。" + " 該当のid=" + itemId);
return null;
}
// 3列目の桁数が不正な場合はエラー
if (item.getBirth_year().length() > 4) {
LOGGER.info("3列目の桁数が不正です。" + " 該当のid=" + itemId);
return null;
}
// 4列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(item.getBirth_month())) {
LOGGER.info("4列目が空またはNULLです。" + " 該当のid=" + itemId);
return null;
}
// 4列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(item.getBirth_month())) {
LOGGER.info("4列目が数値以外です。" + " 該当のid=" + itemId);
return null;
}
// 4列目の桁数が不正な場合はエラー
if (item.getBirth_month().length() > 4) {
LOGGER.info("4列目の桁数が不正です。" + " 該当のid=" + itemId);
return null;
}
// 5列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(item.getBirth_day())) {
LOGGER.info("5列目が空またはNULLです。" + " 該当のid=" + itemId);
return null;
}
// 5列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(item.getBirth_day())) {
LOGGER.info("5列目が数値以外です。"+ " 該当のid=" + itemId);
return null;
}
// 5列目の桁数が不正な場合はエラー
if (item.getBirth_day().length() > 2) {
LOGGER.info(" 5列目の桁数が不正です。"+ " 該当のid=" + itemId);
return null;
}
// 3列目・4列目・5列目から生成される日付が不正であればエラー
String birthDay = item.getBirth_year()
+ DemoStringUtil.addZero(item.getBirth_month())
+ DemoStringUtil.addZero(item.getBirth_day());
if (!DemoStringUtil.isCorrectDate(birthDay, "uuuuMMdd")) {
LOGGER.info("3~5列目の日付が不正です。"+ " 該当のid=" + itemId);
return null;
}
// 6列目が1,2以外の場合はエラー
if (!("1".equals(item.getSex()))
&& !("2".equals(item.getSex()))) {
LOGGER.info("6列目の性別が不正です。"+ " 該当のid=" + itemId);
return null;
}
// 7列目の桁数が不正な場合はエラー
if (!StringUtils.isEmpty(item.getMemo())
&& item.getMemo().length() > 1024) {
LOGGER.info("7列目の桁数が不正です。"+ " 該当のid=" + itemId);
return null;
}
return item;
}
}さらに、ジョブの実行前後に処理を追加するクラスの内容は以下の通りで、@Componentアノテーションを付与してDemoChunkConfigクラスから参照できるようにしている。
package com.example.batch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.stereotype.Component;
import com.example.model.TimerTriggerParam;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
@Component
public class DemoJobListener extends JobExecutionListenerSupport {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER
= LoggerFactory.getLogger(DemoJobListener.class);
/**
* Spring Batchのジョブ実行前の処理を定義する.
*/
@Override
public void beforeJob(JobExecution jobExecution) {
super.beforeJob(jobExecution);
// Spring Batchのジョブ実行前の処理が呼び出されたことをログ出力する
printLog(jobExecution, "beforeJob");
}
/**
* Spring Batchのジョブ実行後の処理を定義する.
*/
@Override
public void afterJob(JobExecution jobExecution) {
super.afterJob(jobExecution);
// Spring Batchのジョブ実行後の処理が呼び出されたことをログ出力する
printLog(jobExecution, "afterJob");
}
/**
* ログ出力を行う.
* @param jobExecution ジョブ実行時の定義オブジェクト
* @param methodName メソッド名
*/
private void printLog(JobExecution jobExecution, String methodName) {
try {
String paramStr
= jobExecution.getJobParameters().getString("timerTriggerParam");
if(paramStr != null) {
TimerTriggerParam param = new ObjectMapper().readValue(
paramStr, new TypeReference<TimerTriggerParam>() {});
LOGGER.info("DemoJobListener " + methodName
+ " triggered: " + param.getTimerInfo());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}また、UserDataクラスの内容は以下の通りで、全項目を文字列型に変更している。
package com.example.mybatis.model;
import lombok.Data;
@Data
public class UserData {
/** ID */
private String id;
/** 名前 */
private String name;
/** 生年月日_年 */
private String birth_year;
/** 生年月日_月 */
private String birth_month;
/** 生年月日_日 */
private String birth_day;
/** 性別 */
private String sex;
/** メモ */
private String memo;
}さらに、pom.xmlの追加内容は以下の通りで、SASトークンを生成するための設定を追加している。
<!-- Azure StorageでSASトークンを利用するための設定 -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.10.0</version>
</dependency>また、文字列を編集するユーティリティクラスの内容は以下の通りで、SASトークンを生成するgetServiceSasUriForBlobメソッドを追加している。
package com.example.util;
import java.time.Duration;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.ResolverStyle;
import org.apache.commons.lang3.StringUtils;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.common.sas.AccountSasPermission;
import com.azure.storage.common.sas.AccountSasResourceType;
import com.azure.storage.common.sas.AccountSasService;
import com.azure.storage.common.sas.AccountSasSignatureValues;
public class DemoStringUtil {
/**
* DateTimeFormatterを利用して日付チェックを行う.
* @param dateStr チェック対象文字列
* @param dateFormat 日付フォーマット
* @return 日付チェック結果
*/
public static boolean isCorrectDate(String dateStr, String dateFormat) {
if (StringUtils.isEmpty(dateStr) || StringUtils.isEmpty(dateFormat)) {
return false;
}
// 日付と時刻を厳密に解決するスタイルで、DateTimeFormatterオブジェクトを作成
DateTimeFormatter df = DateTimeFormatter.ofPattern(dateFormat)
.withResolverStyle(ResolverStyle.STRICT);
try {
// チェック対象文字列をLocalDate型の日付に変換できれば、チェックOKとする
LocalDate.parse(dateStr, df);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 数値文字列が1桁の場合、頭に0を付けて返す.
* @param intNum 数値文字列
* @return 変換後数値文字列
*/
public static String addZero(String intNum) {
if (StringUtils.isEmpty(intNum)) {
return intNum;
}
if (intNum.length() == 1) {
return "0" + intNum;
}
return intNum;
}
/**
* 引数で指定したBlobのSASトークンを生成し返す.
* @param client Blobサービスクライアントオブジェクト
* @return SASトークン
*/
public static String getServiceSasUriForBlob(BlobServiceClient client) {
// SASトークンのアクセス権を読み取り可能に設定
AccountSasPermission permissions
= new AccountSasPermission().setReadPermission(true);
// SASトークンがBlobコンテナやオブジェクトにアクセスできるように設定
AccountSasResourceType resourceTypes
= new AccountSasResourceType().setContainer(true).setObject(true);
// SASトークンがBlobにアクセスできるように設定
AccountSasService services = new AccountSasService().setBlobAccess(true);
// SASトークンの有効期限を5分に設定
OffsetDateTime expiryTime = OffsetDateTime.now().plus(Duration.ofMinutes(5));
// SASトークンを作成
AccountSasSignatureValues sasValues = new AccountSasSignatureValues(
expiryTime, permissions, services, resourceTypes);
return client.generateAccountSas(sasValues);
}
}その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-spring-batch-chunk/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、以下の通り。
1) エラーが発生しない場合のCSVファイル・ログ・取り込み後のDBの内容は以下の通りで、CSVファイルの内容が、SQLデータベース上のUSER_DATAテーブルに全て取り込まれたことが確認できる。


2) 赤枠でエラーが発生する場合のCSVファイル・ログ・取り込み後のDBの内容は以下の通りで、ログにエラー内容が出力され、SQLデータベース上のUSER_DATAテーブルに、エラーでないid=3のデータのみ取り込まれたことが確認できる。


要点まとめ
- Azure Functions上でSpring Batchを利用する際、TaskletモデルだけでなくChunkモデルも利用できる。
- Chunkモデルでは、ファイルの読み込み/データの加工/DBへの書き込みといった処理の流れを定型化している。
- ChunkモデルでAzure Blob Storageのファイル読み込む際は、SASトークンを発行する必要がある。





