Azure Function上で楽観ロックを実装する場合、DB接続する際のSqlSessionを生成する際に、Spring BatchのChunkモデルを用いたバッチ処理ではバッチモード(ExecutorType.BATCH)を利用し、それ以外のオンラインからの呼出ではExecutorType.SIMPLEを利用する必要がある。
今回は、バッチモードとそれ以外で、DB接続設定を使い分けてみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
サンプルプログラムの作成
作成したサンプルプログラムの構成は、以下の通り。

なお、上記の赤枠は、今回追加・変更したプログラムである。
バッチ処理のDB接続設定の内容は以下の通りで、SQLセッションファクトリを生成する際に、バッチモードに設定する処理を追加している。
package com.example.config;
import org.apache.ibatis.mapping.Environment;
import org.springframework.context.annotation.Configuration;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
@MapperScan(basePackages = {"com.example.mybatis.batch"}
, sqlSessionFactoryRef = "sqlSessionFactoryBatch")
public class DemoBatchDataSourceConfig {
/**
* バッチで利用するためのデータソースプロパティを生成する
* @return バッチで利用するためのデータソースプロパティ
*/
@Bean(name = {"datasourceBatchProperties"})
@Primary
@ConfigurationProperties(prefix = "spring.datasource")
public DataSourceProperties datasourceBatchProperties() {
return new DataSourceProperties();
}
/**
* バッチで利用するためのデータソースを生成する
* @param properties バッチで利用するためのデータソースプロパティ
* @return バッチで利用するためのデータソース
*/
@Bean(name = {"dataSourceBatch"})
@Primary
public DataSource datasourceBatch(
@Qualifier("datasourceBatchProperties") DataSourceProperties properties) {
return properties.initializeDataSourceBuilder().build();
}
/**
* バッチで利用するためのトランザクションマネージャを生成する
* @param dataSourceBatch バッチで利用するためのデータソース
* @return バッチで利用するためのトランザクションマネージャ
*/
@Bean(name = {"txManagerBatch"})
@Primary
public PlatformTransactionManager txManagerBatch(
@Qualifier("dataSourceBatch") DataSource dataSourceBatch) {
return new DataSourceTransactionManager(dataSourceBatch);
}
/**
* バッチで利用するためのSQLセッションファクトリを生成する
* @param dataSourceBatch バッチで利用するためのデータソース
* @return バッチで利用するためのSQLセッションファクトリ
* @throws Exception 任意例外
*/
@Bean(name = {"sqlSessionFactoryBatch"})
@Primary
public SqlSessionFactory sqlSessionFactory(
@Qualifier("dataSourceBatch") DataSource dataSourceBatch) throws Exception {
SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dataSourceBatch);
// MyBatisでバッチモードで処理できるよう設定を変更
Environment env = new Environment("development"
, new JdbcTransactionFactory(), dataSourceBatch);
org.apache.ibatis.session.Configuration config
= new org.apache.ibatis.session.Configuration(env);
config.setDefaultExecutorType(ExecutorType.BATCH);
sqlSessionFactory.setConfiguration(config);
return sqlSessionFactory.getObject();
}
}また、オンライン処理のDB接続設定の内容は以下の通りで、こちらはバッチモードの設定を含めていない。
package com.example.config;
import org.springframework.context.annotation.Configuration;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
@MapperScan(basePackages = {"com.example.mybatis.online"}
, sqlSessionFactoryRef = "sqlSessionFactoryOnline")
public class DemoOnlineDataSourceConfig {
/**
* オンラインで利用するためのデータソースプロパティを生成する
* @return オンラインで利用するためのデータソースプロパティ
*/
@Bean(name = {"datasourceOnlineProperties"})
@ConfigurationProperties(prefix = "spring.datasource.online")
public DataSourceProperties datasourceOnlineProperties() {
return new DataSourceProperties();
}
/**
* オンラインで利用するためのデータソースを生成する
* @param properties オンラインで利用するためのデータソースプロパティ
* @return オンラインで利用するためのデータソース
*/
@Bean(name = {"dataSourceOnline"})
public DataSource datasourceOnline(
@Qualifier("datasourceOnlineProperties") DataSourceProperties properties) {
return properties.initializeDataSourceBuilder().build();
}
/**
* オンラインで利用するためのトランザクションマネージャを生成する
* @param dataSourceOnline オンラインで利用するためのデータソース
* @return オンラインで利用するためのトランザクションマネージャ
*/
@Bean(name = {"txManagerOnline"})
public PlatformTransactionManager txManagerOnline(
@Qualifier("dataSourceOnline") DataSource dataSourceOnline) {
return new DataSourceTransactionManager(dataSourceOnline);
}
/**
* オンラインで利用するためのSQLセッションファクトリを生成する
* @param dataSourceOnline オンラインで利用するためのデータソース
* @return オンラインで利用するためのSQLセッションファクトリ
* @throws Exception 任意例外
*/
@Bean(name = {"sqlSessionFactoryOnline"})
public SqlSessionFactory sqlSessionFactory(
@Qualifier("dataSourceOnline") DataSource dataSourceOnline) throws Exception {
SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dataSourceOnline);
return sqlSessionFactory.getObject();
}
}さらに、application.propertiesの設定は以下の通りで、オンラインのDB接続設定を追加し、バッチモードの設定をコメントアウトしている。
# Azure Storageの接続先 azure.storage.accountName=azureblobpurinit azure.storage.accessKey=(Azure Blob Storageのアクセスキー) azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net azure.storage.containerName=blobcontainer # DB接続設定(バッチ) spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.username=purinit@azure-db-purinit spring.datasource.password=(DBのパスワード) spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver # DB接続設定(オンライン) spring.datasource.online.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.online.username=purinit@azure-db-purinit spring.datasource.online.password=(DBのパスワード) spring.datasource.online.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver # MyBatisでバッチモードで処理できるよう設定を変更 #mybatis.executor-type=BATCH #mybatis.configuration.default-executor-type=BATCH
また、pom.xmlの内容は以下の通りで、@ConfigurationPropertiesアノテーションを利用するための設定を追加している。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>demoAzureFunc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Hello Spring Function on Azure</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<azure.functions.maven.plugin.version>1.9.0</azure.functions.maven.plugin.version>
<!-- customize those properties. The functionAppName should be unique across Azure -->
<functionResourceGroup>azureAppDemo</functionResourceGroup>
<functionAppName>azureFuncDemoApp</functionAppName>
<functionAppServicePlan>ASP-azureAppDemo-8679</functionAppServicePlan>
<functionPricingTier>B1</functionPricingTier>
<functionAppRegion>japaneast</functionAppRegion>
<stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory>
<start-class>com.example.DemoAzureFunction</start-class>
<spring.boot.wrapper.version>1.0.25.RELEASE</spring.boot.wrapper.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-adapter-azure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-web</artifactId>
<scope>provided</scope>
</dependency>
<!-- lombokを利用するための設定 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<!-- Spring Batchを利用するための設定 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- Azure Storageの設定 -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>8.3.0</version>
</dependency>
<!-- Azure StorageでSASトークンを利用するための設定 -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.10.0</version>
</dependency>
<!-- SQL Serverを利用するための設定 -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</dependency>
<!-- MyBatisを利用するための設定 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<!-- @ConfigurationPropertiesアノテーションを利用するための設定 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-dependencies</artifactId>
<version>3.1.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-functions-maven-plugin</artifactId>
<version>${azure.functions.maven.plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-functions-maven-plugin</artifactId>
<configuration>
<resourceGroup>${functionResourceGroup}</resourceGroup>
<appName>${functionAppName}</appName>
<appServicePlanName>${functionAppServicePlan}</appServicePlanName>
<region>${functionAppRegion}</region>
<pricingTier>${functionPricingTier}</pricingTier>
<runtime>
<os>Linux</os>
<javaVersion>8</javaVersion>
</runtime>
<appSettings>
<!-- Run Azure Function from package file by default -->
<property>
<name>WEBSITE_RUN_FROM_PACKAGE</name>
<value>1</value>
</property>
<property>
<name>FUNCTIONS_EXTENSION_VERSION</name>
<value>~3</value>
</property>
<property>
<name>FUNCTIONS_WORKER_RUNTIME</name>
<value>java</value>
</property>
</appSettings>
</configuration>
<executions>
<execution>
<id>package-functions</id>
<goals>
<goal>package</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources</id>
<phase>package</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<overwrite>true</overwrite>
<outputDirectory>
${project.build.directory}/azure-functions/${functionAppName}
</outputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/azure
</directory>
<includes>
<include>**</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${stagingDirectory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
</executions>
</plugin>
<!--Remove obj folder generated by .NET SDK in maven clean-->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<configuration>
<filesets>
<fileset>
<directory>obj</directory>
</fileset>
</filesets>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-thin-layout</artifactId>
<version>${spring.boot.wrapper.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/plugins-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/plugins-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/plugins-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/plugins-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>さらに、バッチ処理のMapperインタフェース・XMLファイルの内容は以下の通りで、いずれも「com.example.mybatis.batch」フォルダ下に設定している。
package com.example.mybatis.batch;
import org.apache.ibatis.annotations.Mapper;
import com.example.mybatis.model.UserData;
@Mapper
public interface UserDataMapperBatch {
/**
* DBにUserDataオブジェクトがあれば更新し、なければ追加する.
* @param userData UserDataオブジェクト
*/
void upsert(UserData userData);
/**
* 引数のIDをキーにもつUserDataオブジェクトを行ロックをかけ取得する.
* @param id ID
* @return UserDataオブジェクト
*/
UserData findByIdRowLock(Integer id);
/**
* 引数のUserDataオブジェクトのバージョンを更新する.
* @param userData UserDataオブジェクト
*/
void updateVersion(UserData userData);
}<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mybatis.batch.UserDataMapperBatch">
<update id="upsert" parameterType="com.example.mybatis.model.UserData">
MERGE INTO USER_DATA AS u
USING ( SELECT #{id} id, #{version} version ) s
ON ( u.id = s.id )
WHEN MATCHED AND u.version = s.version THEN
UPDATE SET name = #{name}, birth_year = #{birth_year}
, birth_month = #{birth_month}, birth_day = #{birth_day}
, sex = #{sex}, memo = #{memo}, version = #{version} + 1
WHEN NOT MATCHED THEN
INSERT ( id, name, birth_year, birth_month, birth_day
, sex, memo, version )
VALUES (#{id}, #{name}, #{birth_year}, #{birth_month}, #{birth_day}
, #{sex}, #{memo}, #{version})
;
</update>
<select id="findByIdRowLock" parameterType="java.lang.Integer"
resultType="com.example.mybatis.model.UserData">
SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version
FROM USER_DATA WITH(ROWLOCK, UPDLOCK, NOWAIT) <!-- Azure SQL Databaseで行ロックをかける -->
WHERE id = #{id}
</select>
<update id="updateVersion" parameterType="com.example.mybatis.model.UserData">
UPDATE USER_DATA SET version = #{version} + 2 WHERE id = #{id}
</update>
</mapper>また、オンライン処理のMapperインタフェース・XMLファイルの内容は以下の通りで、いずれも「com.example.mybatis.online」フォルダ下に設定している。
package com.example.mybatis.online;
import org.apache.ibatis.annotations.Mapper;
import com.example.mybatis.model.UserData;
@Mapper
public interface UserDataMapperOnline {
/**
* 引数のIDをキーにもつUserDataオブジェクトのバージョン番号を更新する.
* @param id ID
* @return 更新件数
*/
int updateVersion(Integer id);
/**
* 引数のIDをキーにもつUserDataオブジェクトを取得する.
* @param id ID
* @return UserDataオブジェクト
*/
UserData findById(Integer id);
}<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mybatis.online.UserDataMapperOnline">
<update id="updateVersion" parameterType="java.lang.Integer">
UPDATE USER_DATA SET version = version + 1 WHERE id = #{id}
</update>
<select id="findById" parameterType="java.lang.Integer"
resultType="com.example.mybatis.model.UserData">
SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version
FROM USER_DATA
WHERE id = #{id}
</select>
</mapper>さらに、バッチ処理のMapperインタフェースを呼び出す箇所を、それぞれ変更後のパスに変更している。
package com.example.batch;
import java.net.MalformedURLException;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.UrlResource;
import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;
import lombok.RequiredArgsConstructor;
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {
/** ジョブ生成ファクトリ */
public final JobBuilderFactory jobBuilderFactory;
/** ステップ生成ファクトリ */
public final StepBuilderFactory stepBuilderFactory;
/** SQLセッションファクトリ */
public final SqlSessionFactory sqlSessionFactoryBatch;
/** データ加工処理 */
private final DemoUpdateProcessor demoUpdateProcessor;
/** データ加工前後処理 */
private final DemoProcessorListener demoProcessorListener;
/** データ書き込み前後処理 */
private final DemoUpdateListener demoUpdateListener;
/** ステップの前後処理 */
private final DemoStepListener demoStepListener;
/** BLOBへアクセスするサービス */
@Autowired
private DemoBlobService demoBlobService;
/**
* BlobStorageからファイルを読み込む.
* @return 読み込みオブジェクト
*/
@Bean
public FlatFileItemReader<UserData> reader() {
FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>();
try {
// BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定
String url = demoBlobService.getBlobUrl("user_data.csv");
reader.setResource(new UrlResource(url));
} catch (MalformedURLException ex) {
throw new RuntimeException(ex);
}
// ファイルを読み込む際の文字コード
reader.setEncoding("UTF-8");
// 1行目を読み飛ばす
reader.setLinesToSkip(1);
// ファイルから読み込んだデータをUserDataオブジェクトに格納
reader.setLineMapper(new DefaultLineMapper<UserData>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "id", "name", "birth_year"
, "birth_month", "birth_day", "sex", "memo" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() {
{
setTargetType(UserData.class);
}
});
}
});
return reader;
}
/**
* 読み込んだファイルのデータを、DBに書き込む.
* @return 書き込みオブジェクト
*/
@Bean
public MyBatisBatchItemWriter<UserData> writer() {
return new MyBatisBatchItemWriterBuilder<UserData>()
.sqlSessionFactory(sqlSessionFactoryBatch)
.statementId("com.example.mybatis.batch.UserDataMapperBatch.upsert")
.assertUpdates(true) // 楽観ロックエラーを有効にする
.build();
}
/**
* Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
* @param reader 読み込みオブジェクト
* @param writer 書き込みオブジェクト
* @return Spring Batchのジョブ内で指定する処理単位
*/
@Bean
public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) {
// 生成するステップ内で読み込み/加工/加工後/書き込み/書き込み後/ステップ実行前後の処理の
// 一連の流れを指定する
// その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している
return stepBuilderFactory.get("step")
.<UserData, UserData>chunk(3)
.reader(reader)
.processor(demoUpdateProcessor)
.listener(demoProcessorListener)
.writer(writer)
.listener(demoUpdateListener)
.listener(demoStepListener)
.build();
}
/**
* Spring Batchのジョブを定義する.
* @param jobListener 実行前後の処理(リスナ)
* @param step 実行するステップ
* @return Spring Batchのジョブ
*/
@Bean
public Job updateUserData(DemoJobListener jobListener, Step step) {
// 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
return jobBuilderFactory
.get("updateUserData")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.flow(step)
.end()
.build();
}
}package com.example.batch;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.example.mybatis.batch.UserDataMapperBatch;
import com.example.mybatis.model.UserData;
@Component
public class DemoProcessorListener implements ItemProcessListener<UserData, UserData> {
@Autowired
private UserDataMapperBatch userDataMapperBatch;
/**
* データ加工前の処理を定義する.
*/
@Override
public void beforeProcess(UserData item) {
// 何もしない
}
/**
* データ加工後の処理を定義する.
*/
@Override
@Transactional
public void afterProcess(UserData item, UserData result) {
// 処理結果がnullの場合は、処理を終了する
if(result == null) {
return;
}
// 既に登録済データのVERSIONを取得する
Integer id = Integer.parseInt(result.getId());
UserData ud = userDataMapperBatch.findByIdRowLock(id);
// バージョンの値を結果に設定する
if(ud != null) {
result.setVersion(ud.getVersion());
} else {
result.setVersion(0);
}
// id=8の場合、バージョンを更新し、楽観ロックエラーにする
/* if(id == 8) {
userDataMapperBatch.updateVersion(result);
} */
}
/**
* データ編集エラー後の処理を定義する.
*/
@Override
public void onProcessError(UserData item, Exception e) {
// 何もしない
}
}また、オンライン処理用のHandler、Service、Param、Resultをそれぞれ追加している。
package com.example;
import java.util.Optional;
import org.springframework.cloud.function.adapter.azure.FunctionInvoker;
import com.example.model.OnlineServiceParam;
import com.example.model.OnlineServiceResult;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpMethod;
import com.microsoft.azure.functions.HttpRequestMessage;
import com.microsoft.azure.functions.HttpResponseMessage;
import com.microsoft.azure.functions.HttpStatus;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
public class OnlineHandler extends FunctionInvoker<OnlineServiceParam, OnlineServiceResult> {
/**
* HTTP要求に応じて、HelloFunctionクラスのonlineメソッドを呼び出し、その戻り値をボディに設定したレスポンスを返す
* @param request リクエストオブジェクト
* @param context コンテキストオブジェクト
* @return レスポンスオブジェクト
*/
@FunctionName("online")
public HttpResponseMessage execute(@HttpTrigger(name = "request", methods = HttpMethod.GET
, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request
, ExecutionContext context) {
// リクエストパラメータからidの値を取得
String paramId = request.getQueryParameters().get("id");
// オンラインサービス呼出用Paramを生成
OnlineServiceParam onlineServiceParam = new OnlineServiceParam();
onlineServiceParam.setId(paramId);
// handleRequestメソッド内でHelloFunctionクラスのonlineメソッドを呼び出し、
// その戻り値をボディに設定したレスポンスを、JSON形式で返す
return request.createResponseBuilder(HttpStatus.OK)
.body(handleRequest(onlineServiceParam, context))
.header("Content-Type", "text/json").build();
}
}package com.example.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.example.model.OnlineServiceParam;
import com.example.model.OnlineServiceResult;
import com.example.mybatis.model.UserData;
import com.example.mybatis.online.UserDataMapperOnline;
@Service
public class OnlineService {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER = LoggerFactory.getLogger(OnlineService.class);
@Autowired
private UserDataMapperOnline userDataMapperOnline;
/**
* 指定されたIDのバージョンを更新し、更新後データを返却するサービス.
* @param onlineServiceParam オンラインサービス呼出用Param
* @return オンラインサービスの処理結果
*/
public OnlineServiceResult online(OnlineServiceParam onlineServiceParam) {
OnlineServiceResult result = new OnlineServiceResult();
Integer tmpId = null;
// 引数のIDが数値でなければ、処理を終了
try {
tmpId = Integer.parseInt(onlineServiceParam.getId());
}catch(Exception ex) {
return result;
}
// 指定されたIDのバージョンを更新
int updCnt = userDataMapperOnline.updateVersion(tmpId);
LOGGER.info("更新ID : " + tmpId + ", 更新件数 : " + updCnt);
// 指定されたIDのUserDataオブジェクトを返却
UserData userData = userDataMapperOnline.findById(tmpId);
if(userData == null) {
userData = new UserData();
}
result.setUserData(userData.toString());
return result;
}
}package com.example.model;
import lombok.Data;
@Data
public class OnlineServiceParam {
/** ID */
private String id;
}package com.example.model;
import lombok.Data;
@Data
public class OnlineServiceResult {
/** ユーザ情報 */
private String userData;
}さらに、Functionのメインクラスに、関数onlineを追加している。
package com.example;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.example.model.OnlineServiceParam;
import com.example.model.OnlineServiceResult;
import com.example.model.TimerTriggerParam;
import com.example.model.TimerTriggerResult;
import com.example.service.OnlineService;
import com.example.service.TimerTriggerService;
@SpringBootApplication
public class DemoAzureFunction {
/** タイマートリガーのテストを行うサービスクラスのオブジェクト */
@Autowired
private TimerTriggerService timerTriggerService;
/** オンライン処理を行うサービスクラスのオブジェクト */
@Autowired
private OnlineService onlineService;
public static void main(String[] args) throws Exception {
SpringApplication.run(DemoAzureFunction.class, args);
}
/**
* タイマートリガーのテストを行い結果を返却する関数
* @return タイマートリガーのテストを行うサービスクラスの呼出結果
*/
@Bean
public Function<TimerTriggerParam, TimerTriggerResult> timerTriggerTest() {
return timerTriggerParam -> timerTriggerService.timerTriggerTest(timerTriggerParam);
}
/**
* オンライン処理を行い結果を返却する関数
* @return オンライン処理を行うサービスクラスの呼出結果
*/
@Bean
public Function<OnlineServiceParam, OnlineServiceResult> online(){
return onlineServiceParam -> onlineService.online(onlineServiceParam);
}
}その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-db-batch-online/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、以下の通り。
1) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。
2) バッチ処理の実行結果は、以下の記事の「サンプルプログラムの実行結果」を参照のこと。
3) OnlineHandlerクラスのAPIを呼び出す前のデータベースの状態は、以下の通り。
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC

4) 「https://azurefuncdemoapp.azurewebsites.net/api/online?id=5」とアクセスし、OnlineHandlerクラスのAPIを呼び出すと、以下の戻り値が返却される。

なお、「https://azurefuncdemoapp.azurewebsites.net」の部分は、Azure Portalで確認した、Azure Functionsの以下の赤枠のURLから確認できる。

5) OnlineHandlerクラスのAPIを呼び出した後のデータベースの状態は以下の通りで、id=5のVERSIONが1増加していることが確認できる。
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC

6) OnlineHandlerクラスのAPIを呼び出した際の、ログの出力結果は以下の通りで、ID=5を更新した際の更新件数が1であることが確認できる。

要点まとめ
- Azure Function上で楽観ロックを実装する際、DB接続する際のSqlSessionを生成する際、Spring BatchのChunkモデルを用いたバッチ処理ではバッチモード(ExecutorType.BATCH)を利用し、それ以外のオンラインからの呼出ではExecutorType.SIMPLEを利用する必要がある。その使い分けをするには、DB接続設定を分ければよい。





