今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、GCS(Google Cloud Storage)とBigQueryで連動する、作成したJava 1.8のプログラムの紹介を行う。
前提条件
以下の記事での環境構築が完了していること
GCSとBigQueryで連動するプログラムを作成してみた(2)今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、Windows 10が入っているローカル環境にて、...
また、以下の記事での「Eclipseのダウンロードと解凍」が完了していること
JavaでSQL Serverにデータロードするプログラムを作成した(環境構築編)今回も引き続き、Javaで作成したSQL Serverにデータロードするプログラムについて記載する。ここでは、JavaでSQL Serv...
作成したJava 1.8のプログラム
ここでは、作成したJava 1.8のプログラムを紹介する。
1) 以下の構成でのMavenプロジェクト「select_from_sales」を作成する

2) pom.xmlの内容は以下の通り
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test.bigquery.select</groupId>
<artifactId>select-from-sales</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!-- 文字コードとJavaのバージョンの設定 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<!-- プラグインの設定 -->
<build>
<plugins>
<!-- Javaファイルのコンパイラの設定 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- プロジェクトと依存するライブラリを1つにまとめる設定 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- メインプログラムとして実行するクラスの指定 -->
<mainClass>test.TestBigQuery</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- ライブラリ依存関係の設定 -->
<dependencies>
<dependency>
<!-- BigQuery用ライブラリを追加する設定 -->
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>1.49.0</version>
</dependency>
<dependency>
<!-- JSON用ライブラリを追加する設定 -->
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20180813</version>
</dependency>
</dependencies>
</project>3) メインとなるTestBigQuery.javaは以下の通り。ここで、次項で述べる4)5)を順に呼び出している
package test;
public class TestBigQuery {
public static void main(String[] args) {
InsertIntoSales.main(args);
SelectFromSales.main(args);
}
}4) GCS上のCSVファイル(insert_bigquery_sales.csv)のデータを、BigQueryのsalesテーブルに追加する、InsertIntoSales.javaは以下の通り
package test;
import java.util.UUID;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.CsvOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.LoadJobConfiguration.Builder;
import com.google.cloud.bigquery.Table;
public class InsertIntoSales {
public static void main(String[] args) {
// ロードするGCS上のファイルとBigQuery上のデータセットとテーブルの設定
String sourceUri = "gs://test_purin_bucket/insert_bigquery_sales.csv";
String dataSet = "bigquery_purin_it";
String tableName = "sales";
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
Table table = bigquery.getTable(dataSet, tableName);
// BigQuery上のテーブルにロードするジョブの設定
// ここでは、テーブルデータを全て削除してからロードしている
Builder loadConfig = LoadJobConfiguration.newBuilder(
table.getTableId(), sourceUri);
loadConfig.setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE);
com.google.cloud.bigquery.CsvOptions.Builder csvOptions
= CsvOptions.newBuilder();
csvOptions.setSkipLeadingRows(1);
loadConfig.setFormatOptions(csvOptions.build());
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job loadJob = bigquery.create(JobInfo.newBuilder(loadConfig.build())
.setJobId(jobId).build());
try {
// ジョブを実行し、終了を待つ
System.out.printf("Starting job %s\n", jobId.getJob());
loadJob = loadJob.waitFor();
// BigQuery salesテーブルからデータを抽出するジョブの
// エラーチェック
if (loadJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (loadJob.getStatus().getError() != null) {
throw new RuntimeException(loadJob.getStatus()
.getError().toString());
}
// ジョブの完了メッセージを出力
System.out.println("Job Finished.");
} catch (Exception e) {
System.out.println(e);
}
}
}5) BigQueryのsalesテーブルのデータを、JSONファイル(sales.json)に出力する、SelectFromSales.javaは以下の通り
package test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.UUID;
import org.json.JSONException;
import org.json.JSONObject;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
public class SelectFromSales {
public static void main(String[] args) {
// BigQuery salesテーブルからデータを抽出
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(
"SELECT "
+ " sale_date "
+ ", product_name "
+ ", place_name "
+ ", sales_amount "
+ "FROM `(プロジェクトID).bigquery_purin_it.sales` ")
.setUseLegacySql(false)
.build();
// BigQuery salesテーブルからデータを抽出するジョブを生成
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig)
.setJobId(jobId).build());
try {
// BigQuery salesテーブルからデータを抽出するジョブの終了を待機
queryJob = queryJob.waitFor();
// BigQuery salesテーブルからデータを抽出するジョブの
// エラーチェック
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus()
.getError().toString());
}
// 抽出した結果をJSONファイルに出力
TableResult result = queryJob.getQueryResults();
new SelectFromSales().putJsonFile(result);
System.out.println("JSONファイル出力完了");
} catch (Exception e) {
System.out.println(e);
}
}
private void putJsonFile(TableResult result)
throws JSONException, FileNotFoundException, IOException {
// 抽出した結果をJSON形式に変換
JSONObject jsonObj = new JSONObject();
int idx = 0;
for (FieldValueList row : result.iterateAll()) {
JSONObject jsonObjTmp = new JSONObject();
jsonObjTmp.put("sale_date"
, row.get("sale_date").getStringValue());
jsonObjTmp.put("product_name"
, row.get("product_name").getStringValue());
jsonObjTmp.put("place_name"
, row.get("place_name").getStringValue());
jsonObjTmp.put("sales_amount"
, row.get("sales_amount").getLongValue());
idx += 1;
jsonObj.put(String.valueOf(idx), jsonObjTmp);
}
// JSONファイルを出力
String jsonFilePath = "c:\\work\\gcp\\";
String jsonFileName = "sales.json";
PrintWriter pw = new PrintWriter(jsonFilePath + jsonFileName, "utf-8");
pw.write(jsonObj.toString(4));
pw.close();
}
}なお、mavenプロジェクトの作成・実行等については、以下の記事を参照のこと。
JavaでSQL Serverにデータロードするプログラムを作成した(ソースコード編)今回も引き続き、Javaで作成したSQL Serverにデータロードするプログラムについて記載する。ここでは、JavaでSQL Serv...
作成したJava1.8の実行結果
作成したJava1.8について、maven installにより作成された「select-from-sales-0.0.1-SNAPSHOT-jar-with-dependencies.jar」をコマンドプロンプト上で実行すると、以下のようになる。

なお、GCS上に配置したファイル、BigQuery上のテーブル、出力されたJSONファイルについては、以下の記事の通り。
GCSとBigQueryで連動するプログラムを作成してみた(1)GCS(Google Cloud Storage)とBigQueryで連動するプログラムを作成したので共有する。 GCSは、ファイ...





