以下の記事で、Spring BatchのChunkモデルを利用したバッチで、DBのデータをAzure Blob StorageにCSVファイルを出力する処理を実装していたが、このときは、Azure Functions(Linux)上の、「/home/tmp_user_data.csv」に中間ファイルを出力していた。
今回は、同様の処理を、中間ファイルを利用せずに、Spring BatchのChunkモデルを利用して、DBのデータをAzure Blob StorageにCSVファイルを出力してみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
サンプルプログラムの作成
作成したサンプルプログラムの構成は、以下の通り。

なお、上記の赤枠は、今回追加・変更したプログラムである。
pom.xml、local.settings.jsonは、以下の記事の「作成したサンプルプログラムの内容(Azure Functions)」と同様の修正を行っている。
また、CSVファイルをAzure Blob Storageに出力する処理は、ItemWriterインタフェースを継承した以下のクラスで実装している。
package com.example.batch;
import java.util.List;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;
import com.example.util.DemoStringUtil;
@Component
public class DemoItemWriter implements ItemWriter<UserData> {
/** BLOBへアクセスするサービス */
@Autowired
private DemoBlobService demoBlobService;
/** 取得したUSER_DATAテーブルのデータを出力するBLOB名 */
@Value("${blob.name}")
private String blobName;
/** 改行コード名 */
@Value("${line.sep.name}")
private String lineSepName;
/** CSV書き込み方式 */
@Value("${csv.write.method}")
private String csvWriteMethod;
/**
* 読み込んだDBのデータを、BLOB(CSVファイル)に書き込む処理を定義する.
*/
@Override
public void write(List<? extends UserData> items) {
// 書き込みデータを格納する文字列
StringBuilder sb = new StringBuilder();
// 改行コードを変換
String lineSeparator = DemoStringUtil.getLineSepCode(lineSepName);
// CSV書き込み方式が一括(ALL)の場合
if ("ALL".equals(csvWriteMethod)) {
// ヘッダー行を、書き込みデータに追加
String header = "id,name,birth_year,birth_month,birth_day,sex,memo";
sb.append(header + lineSeparator);
}
// 読み込んだDBのデータ各行を、書き込みデータに追加
for(UserData item : items) {
// ID
sb.append(item.getId());
sb.append(",");
// 名前
sb.append(DemoStringUtil.addDoubleQuote(item.getName()));
sb.append(",");
// 生年月日_年
sb.append(item.getBirth_year());
sb.append(",");
// 生年月日_月
sb.append(item.getBirth_month());
sb.append(",");
// 生年月日_日
sb.append(item.getBirth_day());
sb.append(",");
// 性別
sb.append(DemoStringUtil.addDoubleQuote(item.getSex()));
sb.append(",");
// メモ
sb.append(DemoStringUtil.addDoubleQuote(item.getMemo()));
// 改行コード
sb.append(lineSeparator);
}
// CSV書き込み方式に応じて、書き込みデータを、BLOB(CSVファイル)に書き込む
if ("ALL".equals(csvWriteMethod)) {
demoBlobService.writeBlockBlob(blobName, sb.toString());
} else {
demoBlobService.writeAppendBlob(blobName, sb.toString());
}
}
}さらに、ItemWriterインタフェースを継承したクラスは、以下のSpring Batch定義クラスで呼び出している。
package com.example.batch;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.example.mybatis.model.UserData;
import lombok.RequiredArgsConstructor;
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {
/** ジョブ生成ファクトリ */
public final JobBuilderFactory jobBuilderFactory;
/** ステップ生成ファクトリ */
public final StepBuilderFactory stepBuilderFactory;
/** SQLセッションファクトリ */
public final SqlSessionFactory sqlSessionFactory;
/** ステップの前後処理 */
private final DemoStepListener demoStepListener;
/** データ書き込み処理 */
private final DemoItemWriter demoItemWriter;
/** チャンクサイズ */
@Value("${chunk.size}")
private String chunkSize;
/**
* MyBatisPagingItemReaderを使ってUSER_DATAデータをページを分けて取得する.
* @return 読み込みオブジェクト
*/
@Bean
public MyBatisPagingItemReader<UserData> reader(){
return new MyBatisPagingItemReaderBuilder<UserData>()
.sqlSessionFactory(sqlSessionFactory)
.queryId("com.example.mybatis.UserDataMapper.findAll")
.pageSize(Integer.parseInt(chunkSize))
.build();
}
/**
* Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
* @param reader 読み込みオブジェクト
* @param writer 書き込みオブジェクト
* @return Spring Batchのジョブ内で指定する処理単位
*/
@Bean
public Step step(MyBatisPagingItemReader<UserData> reader
, ItemWriter<UserData> writer) {
// 生成するステップ内で読み込み/書き込みを一連の流れを指定する
// その際、読み込みデータの加工は行わず、
// チャンクサイズ(=何件読み込む毎にコミットするか)を指定している
return stepBuilderFactory.get("step")
.<UserData, UserData>chunk(Integer.parseInt(chunkSize))
.reader(reader)
.writer(demoItemWriter)
.listener(demoStepListener)
.build();
}
/**
* Spring Batchのジョブを定義する.
* @param jobListener 実行前後の処理(リスナ)
* @param step 実行するステップ
* @return Spring Batchのジョブ
*/
@Bean
public Job updateUserData(DemoJobListener jobListener, Step step) {
// 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
return jobBuilderFactory
.get("selectUserData")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.flow(step)
.end()
.build();
}
}また、Spring Batchのジョブ内で指定する処理単位(ステップ)実行前後の処理を記載したクラスの内容は以下の通りで、Blob Storageのファイル削除やCSVファイルヘッダーの書き込みを行っている。
package com.example.batch;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.example.service.DemoBlobService;
import com.example.util.DemoStringUtil;
@Component
public class DemoStepListener implements StepExecutionListener {
/** BLOBへアクセスするサービス */
@Autowired
private DemoBlobService demoBlobService;
/** 取得したUSER_DATAテーブルのデータを出力するBLOB名 */
@Value("${blob.name}")
private String blobName;
/** 改行コード名 */
@Value("${line.sep.name}")
private String lineSepName;
/** CSV書き込み方式 */
@Value("${csv.write.method}")
private String csvWriteMethod;
/**
* Spring Batchのジョブ内で指定する処理単位(ステップ)実行前の処理を定義する.
*/
@Override
public void beforeStep(StepExecution stepExecution) {
// Blobファイル(BlockBlob,追加Blob)があれば削除
demoBlobService.deleteBlockBlob(blobName);
demoBlobService.deleteAppendBlob(blobName);
// 改行コードを変換
String lineSeparator = DemoStringUtil.getLineSepCode(lineSepName);
// CSV書き込み方式が分割(DIV)の場合
if ("DIV".equals(csvWriteMethod)) {
// Blobファイルを作成後、ヘッダー行を出力
String header = "id,name,birth_year,birth_month,birth_day,sex,memo";
demoBlobService.writeAppendBlob(blobName, header + lineSeparator);
}
}
/**
* Spring Batchのジョブ内で指定する処理単位(ステップ)実行後の処理を定義する.
*/
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
}さらに、Blob Storageにアクセスするクラスの内容は以下の通りで、Blob Storageへのファイルを書き込みや削除を行っている。
package com.example.service;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudAppendBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import io.netty.util.internal.StringUtil;
@Service
public class DemoBlobService {
/** Azure Storageのアカウント名 */
@Value("${azure.storage.accountName}")
private String storageAccountName;
/** Azure Storageへのアクセスキー */
@Value("${azure.storage.accessKey}")
private String storageAccessKey;
/** Azure StorageのBlobコンテナー名 */
@Value("${azure.storage.containerName}")
private String storageContainerName;
/** 文字コード */
@Value("${character.code}")
private String characterCode;
// Blobストレージへの接続文字列
private String storageConnectionString = null;
/**
* Blobストレージへの接続文字列の初期化処理
*/
@PostConstruct
public void init() {
// Blobストレージへの接続文字列を設定
storageConnectionString = "DefaultEndpointsProtocol=https;"
+ "AccountName=" + storageAccountName + ";"
+ "AccountKey=" + storageAccessKey + ";";
}
/**
* 引数で指定されたファイル(BlockBlob)に、引数で指定されたメッセージを書き込む.
* @param fileName ファイル名(BlockBlob)
* @param message メッセージ
*/
public void writeBlockBlob(String fileName, String message) {
// 未指定の項目がある場合、何もしない
if(StringUtil.isNullOrEmpty(storageAccountName)
|| StringUtil.isNullOrEmpty(storageAccessKey)
|| StringUtil.isNullOrEmpty(storageContainerName)
|| StringUtil.isNullOrEmpty(fileName)
|| StringUtil.isNullOrEmpty(message)) {
return;
}
// ファイルアップロード処理
// Blob内のコンテナー内の指定したファイル(BlockBlob)に、指定したメッセージを書き込む
try {
CloudBlockBlob cbb = getContainer().getBlockBlobReference(fileName);
cbb.deleteIfExists();
BlobRequestOptions options = new BlobRequestOptions();
options.setAbsorbConditionalErrorsOnRetry(true);
cbb.uploadText(message, characterCode, null, options, null);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* 引数で指定されたBlob(BlockBlob)が存在する場合、削除する.
* @param blobName Blob名(BlockBlob)
*/
public void deleteBlockBlob(String fileName) {
// 未指定の項目がある場合、何もしない
if(StringUtil.isNullOrEmpty(storageAccountName)
|| StringUtil.isNullOrEmpty(storageAccessKey)
|| StringUtil.isNullOrEmpty(storageContainerName)
|| StringUtil.isNullOrEmpty(fileName)) {
return;
}
// Blob内のコンテナー内の指定したファイル(BlockBlob)が存在する場合、削除する
try {
CloudBlockBlob cbb = getContainer().getBlockBlobReference(fileName);
cbb.deleteIfExists();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* 引数で指定されたファイル(追加Blob)に、引数で指定されたメッセージを追加する.
* @param fileName ファイル名(追加Blob)
* @param message メッセージ
*/
public void writeAppendBlob(String fileName, String message) {
// 未指定の項目がある場合、何もしない
if(StringUtil.isNullOrEmpty(storageAccountName)
|| StringUtil.isNullOrEmpty(storageAccessKey)
|| StringUtil.isNullOrEmpty(storageContainerName)
|| StringUtil.isNullOrEmpty(fileName)
|| StringUtil.isNullOrEmpty(message)) {
return;
}
// ファイルアップロード処理
// Blob内のコンテナー内の指定したファイル(追加Blob)に、指定したメッセージを追記モードで書き込む
try {
CloudAppendBlob cab = getContainer().getAppendBlobReference(fileName);
if (!cab.exists()) {
cab.createOrReplace();
}
BlobRequestOptions options = new BlobRequestOptions();
options.setAbsorbConditionalErrorsOnRetry(true);
cab.appendText(message, characterCode, null, options, null);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* 引数で指定されたファイル(追加Blob)が存在する場合、削除する.
* @param fileName ファイル名(追加Blob)
*/
public void deleteAppendBlob(String fileName) {
// 未指定の項目がある場合、何もしない
if(StringUtil.isNullOrEmpty(storageAccountName)
|| StringUtil.isNullOrEmpty(storageAccessKey)
|| StringUtil.isNullOrEmpty(storageContainerName)
|| StringUtil.isNullOrEmpty(fileName)) {
return;
}
// Blob内のコンテナー内の指定したファイル(追加Blob)が存在する場合、削除する
try {
CloudAppendBlob cab = getContainer().getAppendBlobReference(fileName);
cab.deleteIfExists();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Blobストレージのコンテナを取得する.
* @return Blobストレージのコンテナ
* @throws Exception
*/
private CloudBlobContainer getContainer() throws Exception {
// ストレージアカウントオブジェクトを取得
CloudStorageAccount storageAccount
= CloudStorageAccount.parse(storageConnectionString);
// Blobクライアントオブジェクトを取得
CloudBlobClient blobClient
= storageAccount.createCloudBlobClient();
// Blob内のコンテナーを取得
CloudBlobContainer container
= blobClient.getContainerReference(storageContainerName);
return container;
}
}また、application.propertiesに、CSVファイル出力設定を追加している。
# Azure Storageの接続先 azure.storage.accountName=azureblobpurinit azure.storage.accessKey=(Azure Blob Storageのアクセスキー) azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net azure.storage.containerName=blobcontainer # DB接続設定 spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.username=purinit@azure-db-purinit spring.datasource.password=(DBのパスワード) spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver ## ファイル出力設定 # チャンクサイズ chunk.size=300000 # BLOB名 blob.name=user_data.csv # 文字コード character.code=UTF-8 # 改行コード名(CR/LF/CRLF) line.sep.name=LF # CSV書き込み方式(ALL(一括)/DIV(分割)) csv.write.method=ALL
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-db-to-csv-direct/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、下記記事の「サンプルプログラムの実行結果」と同じ結果となる。
ただし、CSV書き込み方式をDIV(分割)に設定した場合、以下のように、CSVファイルが、追加BLOBとして出力されることが確認できる。

また、CSV書き込み方式をALL(一括)に設定した場合、以下のように、CSVファイルが、ブロックBLOBとして出力されることが確認できる。

さらに、取得元のDB(Azure SQL Database上のUSER_DATAテーブル)に30万件のデータを入れた場合の性能は、以下の通り。
1) 下記記事の「テストデータ作成」に従って、30万件のデータを作成する。
2) application.propertiesを、以下のように、チャンクサイズ=10000、CSV書き込み方式=DIV(分割)に変更する。
## ファイル出力設定 # チャンクサイズ chunk.size=10000 # BLOB名 blob.name=user_data.csv # 文字コード character.code=UTF-8 # 改行コード名(CR/LF/CRLF) line.sep.name=LF # CSV書き込み方式(ALL(一括)/DIV(分割)) csv.write.method=DIV
3) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。
4) バッチ実行時のログ出力内容は以下の通りで、83902ms(約1分24秒)で処理が完了していることが確認できる。

なお、上記ログの確認手順は、以下のサイトを参照のこと。
なお、上記CPU・メモリ使用率の確認手順は、以下のサイトを参照のこと。
6) application.propertiesを、以下のように、チャンクサイズ=2300に変更し、サンプルプログラムをAzure Functionsにデプロイする。
## ファイル出力設定 # チャンクサイズ chunk.size=2300 # BLOB名 blob.name=user_data.csv # 文字コード character.code=UTF-8 # 改行コード名(CR/LF/CRLF) line.sep.name=LF # CSV書き込み方式(ALL(一括)/DIV(分割)) csv.write.method=DIV
7) バッチ実行時のログ出力内容は以下の通りで、76713ms(約1分17秒)で処理が完了していることが確認できる。

9) application.propertiesを、以下のように、チャンクサイズ=300000、CSV書き込み方式=ALL(一括)に変更し、サンプルプログラムをAzure Functionsにデプロイする。
## ファイル出力設定 # チャンクサイズ chunk.size=300000 # BLOB名 blob.name=user_data.csv # 文字コード character.code=UTF-8 # 改行コード名(CR/LF/CRLF) line.sep.name=LF # CSV書き込み方式(ALL(一括)/DIV(分割)) csv.write.method=ALL
10) バッチ実行時のログ出力内容は以下の通りで、68092ms(約1分8秒)で処理が完了していることが確認できる。

11) バッチ実行時のCPU・メモリ使用率は以下の通りで、メモリ使用率がかなり上昇していることが確認できる。

要点まとめ
- Spring BatchのChunkモデルを利用したバッチ処理でも、ItemWriterインタフェースを継承したクラスを利用することで、中間ファイルを利用せずに、DBのデータをAzure Blob StorageにCSVファイルを出力できる。







