こんにちは。那須野拓実です。
以前書いた「Dataformを使ってBigQueryにあるGA4データを加工してLooker Studioで可視化してみた話」という記事でDataformの基本的な使い方をまとめた際、その中で簡易的なSQLも紹介していました。ただ、実務を考えるとちょっと物足りないというのが正直なところでした。
Dataformをデータ分析の実務で使えるようにするには、データ分析のためのSQLの基本的な書き方を押さえていくことが必須になります。
今回は、小売業の売上データを分析することを想定し、サンプルデータに対して全ての処理をSQLで書いたうえで、データ処理の型としてまとめてみたいと思います。なお、元となる各データソースはGoogleスプレッドシートに置いており、Googleスプレッドシートを参照して外部テーブルを生成するところからスタートする前提で進めます。
元となるデータソースのサンプルは以下の5テーブルです。小売業の売上データになりますが、EC主体で、店頭でのビジネスも小規模ながら展開しているビジネスをイメージしています。
データ処理の中では、ECと店頭にまたがるデータを統合し、顧客それぞれについて購入金額や購入頻度、購入商品カテゴリー、値引き感応度などを集計して、最終的にマーケティング観点での顧客セグメントを割り当てるところまでをやりたいと思います。
データリネージ(データの処理の流れ)は、テーブル単位で書くと以下のようなイメージです。
注文単位で生成されるorder、そしてorderの中の細かい商品明細の情報を持つorder_lineの2種類について、ECと店頭にまたがるデータを統合し、最終的に両データを集計のうえ顧客セグメントを定義する流れになります。
詳細はGitHubのリポジトリに譲りますが、SQLとしては特殊な構文はそこまで使っておらず、SELECT句、WHERE句、GROUP BY句、ORDER BY句、JOIN句、WITH句あたりが使えれば基本的には大丈夫です。ただし、以下2点については留意をしたうえで次の章に進みましょう。
教科書通りにはいかない、というのが現場のデータ処理ですね…。
それでは、GitHubのリポジトリにあるコードを参考に、Dataformによるデータ処理あるあるの型を見ていきたいと思いますこれが全てではないですが、これらを押さえれば最低限の基本にはなるのかなと思います。
なお、元データとなるスプレッドシートもセットで公開しているので、併せてご利用ください。それでは見ていきましょう。
Dataformはconfig.typeによって基本的なデータ処理を簡易に実装できるようになっており、そのうちの1つがテーブルの新規作成になります。config.type:'table'にて宣言すると、SELECT文を書くだけで自動的にテーブルを作ってくれるというものです。
コツは、カラム名を定義する場合はconfig.columnsに設定すること、そして他のテーブルを参照するときは ${ref('ec_product')} のように他のSQLXファイルで定義したテーブルのnameをrefによる参照記法の中に入れることですね。
例えばproductのテーブル作成は、以下のようなSQLで実装しています。ちなみにちょっとしたデータ加工として頻出するのが、以下のようなWHERE句やORDER BY句です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
config { type: 'table', schema: 'sales_sample_output', name: 'product', description: '商品一覧', columns: { product_id: '商品ID', product_name: '商品名', product_category: '商品カテゴリ', product_price: '商品価格', product_average_cost: '商品平均原価' }, tags: ['01_clensing_from_outer_tables'] } SELECT product_id, product_name, product_category, product_price, product_average_cost FROM ${ref('ec_product')} WHERE product_id IS NOT NULL ORDER BY product_id ASC |
なお、このconfig.type:'table'は、カラムのデータ型が自動定義される点には注意が必要です。カラムのデータ型を間接的に定義したい場合は、SELECT文の中で意図的にキャスト(型変換)するか、もしくは後述するconfig.type:'operations'で対応する方がよいでしょう。
BigQueryの集計と言えば、まずはGROUP BY句でしょう。トランザクション系のデータを、様々な単位で集約・集計することができます。グループ化するキーとなるカラムはGROUP BYの後に記述しつつ、それ以外のカラムは集約関数(MIN, MAX, SUM, COUNTなど)に入れておきます。
今回例に挙げている購買データであれば、顧客単位に集約する集計が頻出します。例えばcustomer_order_totalのテーブルです。
GROUP BY句を実際に書くときは、他の句とセットで使うことが多いですが、WHERE句の後、ORDER BY句の前に書くと覚えておきましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
config { type: 'table', schema: 'sales_sample_output', name: 'customer_order_total', description: '顧客注文(累計)', tags: ['02_creating_from_inner_tables'] } SELECT customer_id, MIN(create_date) AS first_purchase_date, MAX(create_date) AS last_purchase_date, SUM(price) AS total_purchase_amount, COUNT(unified_order_id) AS total_purchase_count, SUM(total_discount) AS total_purchase_discount, ROUND(SUM(total_discount) / SUM(price),3) AS total_discount_rate FROM ${ref('order')} WHERE customer_id IS NOT NULL GROUP BY customer_id |
※参照することの少ない中間テーブルなので、カラムのdescriptionは省略しています。これぐらいの行数だとシンプルで見やすいですね。
GROUP BY句と同じくらい多いデータ処理の1つが、LEFT JOIN句によるマスタ結合だと思います。そもそもJOIN句は、LEFT JOIN以外にもいくつか種類はありますが、ここではLEFT JOINだけを紹介します。気になる方は調べてみてください。
データベースでは「誰が、いつ、どこで、何を、どうした」という履歴であるトランザクションデータを保持しますが、多くの場合、この「誰が」「どこで」「何を」といったデータはIDだけを持ち、そのIDに紐づく静的な詳細データを別のマスタテーブルに保持します。
例えば購買データであれば、商品IDに紐づく形で商品情報を整理し、それを商品一覧のテーブルとして独立して管理しがちです。結果、購買履歴のテーブルには「何を買ったか」を表す商品IDはあるものの、その商品名や商品カテゴリ、定価や原価などは商品一覧のテーブルを参照する必要が出てきます。
データ分析の際にはそういった情報を付与したうえでデータを見ることが必要不可欠なので、LEFT JOIN句は非常に重要になっています。
具体例として、shop_order_line_with_discountのSQLを見てみます。先ほどと同じようにconfig.type:'table'でテーブル作成していますが、SELECT文の中にLEFT JOINが入っていますね。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
config { type: 'table', schema: 'sales_sample_output', name: 'shop_order_line_with_discount', description: '店頭注文明細(値引き情報再現)', columns: { receipt_id: 'レシートID', order_line_index: '明細番号', create_date: '日付', product_name: '商品名', product_id: '商品ID', product_category: '商品カテゴリ', quantity: '数量', line_price: '明細価格', line_discount: '明細値引き額', line_price_before_discount: '値引き前明細価格', customer_id: '顧客ID' }, bigquery: { partitionBy: 'create_date' }, tags: ['01_clensing_from_outer_tables'] } SELECT s.receipt_id, s.order_line_index, s.created_at AS create_date, s.product_name, p.product_id, p.product_category, s.quantity, s.price AS line_price, s.quantity * p.product_price - s.price AS line_discount, s.quantity * p.product_price AS line_price_before_discount, s.customer_id FROM ${ref('shop_order_line')} s LEFT JOIN ${ref('product')} p ON s.product_name = p.product_name |
ポイントとして、LEFT JOINをする際はSQLの中に複数のテーブルが登場するため、どのテーブルのカラムか区別するために、カラムを{table_name}.{column_name}で明示的に定義するようにしておきましょう。
なお、{table_name}は文字数が長い場合が多いため、SQL上では省略することが多いです。テーブル名の頭文字を使ったり、a,b,cのようなアルファベットの始めの文字を使ったりすることが多いです。
LEFT JOIN句に次いで頻出するのが、UNION ALLによるトランザクションの統合です。複数のテーブルに分かれている同型のトランザクションを、カラム構成を整えつつ統合します。
具体例として、orderテーブルのSQL(※後述する差分同期処理の一部なので留意のこと)を抜粋します。この例では、店頭の売上データとECの売上データを統合して1つのテーブルにしています。
カラム構成が一致していることが必須であるため、差分がある場合は以下のように各カラムを整えてあげる必要があります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
CREATE OR REPLACE TABLE `sales_sample_intermediate.order_incremental` AS ( -- 店頭データを生成 SELECT CONCAT('S_',receipt_id) AS unified_order_id, create_date, NULL AS create_datetime, customer_id, price, price_before_discount, total_discount, total_line_discount, order_discount, '店頭' AS sales_channel FROM ${ref('shop_order')} WHERE create_date ${dataform.projectConfig.vars.EQUALS_CALC_DATETIME} -- ECデータを結合 UNION ALL SELECT CONCAT('E_',order_id) AS unified_order_id, DATE(created_at) AS create_date, created_at AS create_datetime, customer_id, price, price_before_discount, total_discount, total_line_discount, order_discount, 'EC' AS sales_channel FROM ${ref('ec_order')} WHERE created_at ${dataform.projectConfig.vars.EQUALS_CALC_DATETIME} -- マージした後にソート ORDER BY create_date ASC, customer_id ASC ); |
押さえておきたいポイントは、ソートするタイミングでしょう。UNION ALLで足した後に、最後に全体に対してソートを効かせるような置き方なので注意しておきましょう。
トランザクション系のデータは日々増えていくため、全数のデータを定期的にクレンジング対象や集計対象にしていたらBigQueryのスキャン量、つまり請求額が加速度的に増えていきます。そのため、一定以上の量(初期のボリュームないし肥大化速度の観点で)があるテーブルについては、更新されたデータだけを処理する差分処理の実装が重要になってきます。
Dataformでも差分処理はサポートされており、config.type:'incremental'によって簡単に実装可能なのですが、大きな落とし穴があります。というのも、これはいわゆるupsertのみ(updateとinsert)が対象となっており、元データがdeleteされたときは処理の対象外となっているのです。
もちろん実行時オプションで [完全に更新して実行する] にチェックを入れることで全データを再計算して元テーブルのdeleteされたレコードもケアすることは可能ですが、元データのdeleteが定期的に起こる運用においてはあまり望ましい処理ではありません。
deleteも含めてリアルタイムな同期をということであれば、累積テーブル、差分テーブルをSQLで作り、MERGE句によってINSERT, UPDATE, DELETEそれぞれの場合を律儀に実装する必要があります。幸い、Dataformではconfig.type:'operations'によって自由にSQLを書けるので、以下のような実装が可能になっています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
config { type: 'operations', schema: 'sales_sample_output', name: 'order', description: '注文一覧(店頭とECを統合)', hasOutput: true, tags: ['01_clensing_from_outer_tables'] } -- (1/3) 初回の場合、累積テーブルを作成 CREATE TABLE IF NOT EXISTS `sales_sample_output.order` ( unified_order_id STRING OPTIONS(description='統合注文ID'), create_date DATE OPTIONS(description='日付'), create_datetime DATETIME OPTIONS(description='日時'), customer_id STRING OPTIONS(description='顧客ID'), price INT64 OPTIONS(description='価格'), price_before_discount INT64 OPTIONS(description='値引き前価格'), total_discount INT64 OPTIONS(description='値引き額合計'), total_line_discount INT64 OPTIONS(description='明細値引き額合計'), order_discount INT64 OPTIONS(description='注文値引き額'), sales_channel STRING OPTIONS(description='販売チャネル') ) PARTITION BY create_date OPTIONS ( description = '注文一覧' ); -- (2/3) 更新期間内の差分テーブルを作成 CREATE OR REPLACE TABLE `sales_sample_intermediate.order_incremental` AS ( -- 店頭データを生成 SELECT CONCAT('S_',receipt_id) AS unified_order_id, create_date, NULL AS create_datetime, customer_id, price, price_before_discount, total_discount, total_line_discount, order_discount, '店頭' AS sales_channel FROM ${ref('shop_order')} WHERE create_date ${dataform.projectConfig.vars.EQUALS_CALC_DATETIME} -- ECデータを結合 UNION ALL SELECT CONCAT('E_',order_id) AS unified_order_id, DATE(created_at) AS create_date, created_at AS create_datetime, customer_id, price, price_before_discount, total_discount, total_line_discount, order_discount, 'EC' AS sales_channel FROM ${ref('ec_order')} WHERE created_at ${dataform.projectConfig.vars.EQUALS_CALC_DATETIME} -- マージした後にソート ORDER BY create_date ASC, customer_id ASC ); -- (3/3) 差分テーブルを累積テーブルにマージ MERGE `sales_sample_output.order` AS t USING `sales_sample_intermediate.order_incremental` AS s ON t.unified_order_id = s.unified_order_id WHEN NOT MATCHED BY SOURCE AND t.create_date ${dataform.projectConfig.vars.EQUALS_CALC_DATETIME} THEN DELETE WHEN MATCHED THEN UPDATE SET t.create_date = s.create_date, t.create_datetime = s.create_datetime, t.customer_id = s.customer_id, t.price = s.price, t.price_before_discount = s.price_before_discount, t.total_discount = s.total_discount, t.total_line_discount = s.total_line_discount, t.order_discount = s.order_discount, t.sales_channel = s.sales_channel WHEN NOT MATCHED THEN INSERT ( unified_order_id, create_date, create_datetime, customer_id, price, price_before_discount, total_discount, total_line_discount, order_discount, sales_channel ) VALUES ( s.unified_order_id, s.create_date, s.create_datetime, s.customer_id, s.price, s.price_before_discount, s.total_discount, s.total_line_discount, s.order_discount, s.sales_channel ); |
ただし、見ての通り、かなり長くなってしまいます。なんと111行です…。
そのため、operationsで律儀に書くか、incrementalによる実装ができるかどうかは、運用としてどこまで許容できるかの観点で慎重に検討しておいたほうがよさそうです。もしincrementalで実装できる場合、今回の例で言えば111行だったコードが以下のように69行に激減するのは無視できない魅力です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
config { type: 'incremental', schema: 'sales_sample_output', name: 'order_incremental', description: '注文一覧(店頭とECを統合)', columns: { unified_order_id:' 統合注文ID', create_date: '日付', create_datetime: '日時', customer_id: '顧客ID', price: '価格', price_before_discount: '値引き前価格', total_discount: '値引き額合計', total_line_discount: '明細値引き額合計', order_discount: '注文値引き額', sales_channel: '販売チャネル' }, uniqueKey: ["unified_order_id"], bigquery: { partitionBy: "create_date", updatePartitionFilter: "create_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)" } } pre_operations { DECLARE create_date_checkpoint DEFAULT ( ${when(incremental(), `SELECT DATE_SUB(CURRENT_DATE(), INTERVAL 3 DAY)`, `SELECT DATE(2001,1,1)`)} ) } -- 店頭データを生成 SELECT CONCAT('S_',receipt_id) AS unified_order_id, create_date, NULL AS create_datetime, customer_id, price, price_before_discount, total_discount, total_line_discount, order_discount, '店頭' AS sales_channel FROM ${ref('shop_order')} WHERE create_date >= create_date_checkpoint -- ECデータを結合 UNION ALL SELECT CONCAT('E_',order_id) AS unified_order_id, DATE(created_at) AS create_date, created_at AS create_datetime, customer_id, price, price_before_discount, total_discount, total_line_discount, order_discount, 'EC' AS sales_channel FROM ${ref('ec_order')} WHERE DATE(created_at) >= create_date_checkpoint -- マージした後にソート ORDER BY create_date ASC, customer_id ASC |
複雑なワークフローを組む際は、同じコードを使い回したいことが出てくると思います。また、特定の部分だけ書き換えて実行したいという場合も出てくると思います。そういった場合に有効なのが、projectConfig.varsによって定義されるコンパイル変数になります。
前項のorderテーブルを作るSQLにて登場していた
1 |
WHERE create_date ${dataform.projectConfig.vars.EQUALS_CALC_DATETIME} |
という表現が1例です。コンパイル変数の特徴として、
という2点があり、うまく組み合わせることで融通の利く開発と実行が可能になります。
1つ目のデフォルト値については、例えば今回のプロジェクトであればworkflow_settings.yamlにて以下のようにデフォルトの集計期間をBETWEEN句で定義しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
defaultProject: {bigquery_project_id} defaultLocation: {bigquery_project_location} defaultDataset: {bigquery_dataset_name} defaultAssertionDataset: dataform_assertions dataformCoreVersion: 3.0.0 # プロジェクトの共通パラメータ(コンパイル時に上書きしうる) vars: # ソースとなるスプレッドシートのID SOURCE_SPREADSHEET_ID: '1nShXon7tEcSpm1jyVwl-ouYVAhpcln3Nzwuwhu1TiFI' # 各種集計パラメータ EQUALS_CALC_DATETIME: BETWEEN '2001-01-01' AND '2025-12-31' ONE_YEAR_AGO: DATE(2024,1,1) YESTERDAY: DATE(2024,12,31) |
上記はサンプルなので日付をハードコーディングしていますが、実際の運用ではDATE_SUB関数やDATE_TRUNC関数を使うといろいろなパターンが実装できて便利ですね。(※データの性質によって微妙に変更が必要な場合はあるのでご留意ください。)
そして2つ目のコンパイル時の上書きについてです。 コンパイルするときに以下のように [コンパイル変数] を入力することができるようになっています。
なので、
のように入力してリリース構成に「直近1週間の集計」のように名前を付けて保存をすることで、全集計ではなく直近1週間だけのデータを更新するリリース構成にすることができます。
この組み合わせによって、
のように任意の期間での再集計が可能になります。実務上は集計条件を微妙に変えて実行したいケースが多いですが、そのときに原本のコードを都度変更するのはリスクがあります。運用にて変えうる部分はコンパイル変数にしてリリース構成で上書きできる設計にしておくとよいでしょう。
Dataformの特長として、CREATE TEMP FUNCTIONメソッドによってJavaScriptを使ってデータ処理を実装して複雑な集計を実現できる点があります。
CREATE TEMP FUNCTION自体がSQLのため、config.type:'operations'による実装にはなりますが、トランザクションをもとに集約する場合や、FORループを使いたい場合、多段階の変数処理が入る場合などはJavaScriptによる実装の方が有利です。
今回のデータパイプラインではcustmerテーブルにて各種顧客セグメントを作る際に利用しており、実際のコードが以下になります。
全体感としては、CREATE TEMP FUNCTIONで複数の関数を定義したうえで、最後にCREATE OR REPLACE TABLEメソッド1本で集計しています。(※押さえておきたいコツが多いので、それはコードの後で説明します。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
config { type: 'operations', schema: 'sales_sample_output', name: 'customer', description: '顧客一覧', hasOutput: true, tags: ['02_creating_from_inner_tables'] } -- JavaScript functionsを使うため、やむなくoperationsで作成 -- RFMのRecency計算 CREATE TEMP FUNCTION get_age( date_of_birth DATE, yesterday DATE ) RETURNS STRING LANGUAGE js AS r""" const milli_seconds = 24 * 60 * 60 * 1000; const year_diff = Math.floor((yesterday - date_of_birth) / milli_seconds / 365); return Math.floor(year_diff / 10) + '0代'; """; -- RFMのRecency計算 CREATE TEMP FUNCTION get_purchase_recency_segment( last_purchase_date DATE, yesterday DATE ) RETURNS STRING LANGUAGE js AS r""" const milli_seconds = 24 * 60 * 60 * 1000; const date_diff = Math.floor((yesterday - last_purchase_date) / milli_seconds); if (date_diff <= 30){ return 'R1_30日以内'; } else if (date_diff <= 90){ return 'R2_90日以内'; } else if (date_diff <= 180){ return 'R3_180日以内'; } else if (date_diff <= 365){ return 'R4_1年以内'; } return 'R5_1年よりも前' """; -- RFMのFreqency計算 CREATE TEMP FUNCTION get_purchase_frequency_segment( one_year_purchase_count INT64 ) RETURNS STRING LANGUAGE js AS r""" if (one_year_purchase_count >= 12){ return 'F1_毎月'; } else if (one_year_purchase_count >= 4){ return 'F2_2~3か月に1回'; } else if (one_year_purchase_count >= 2){ return 'F3_半年に1回'; } else if (one_year_purchase_count >= 1){ return 'F4_年に1回'; } return 'F5_年1回未満' """; -- RFMのMonetary計算 CREATE TEMP FUNCTION get_purchase_monetary_segment( total_purchase_amount INT64 ) RETURNS STRING LANGUAGE js AS r""" if (total_purchase_amount >= 1000000){ return 'M1_100万円以上'; } else if (total_purchase_amount >= 100000){ return 'M2_10万円以上'; } else if (total_purchase_amount >= 10000){ return 'M3_1万円以上'; } else if (total_purchase_amount >= 1000){ return 'M4_千円以上'; } return 'M5_千円未満'; """; -- 購入商品セグメント CREATE TEMP FUNCTION get_purchase_product_segment( product_summary ARRAY<STRUCT<create_date DATE, product_category STRING>> ) RETURNS STRING LANGUAGE js AS r""" /* 直近5個の購入商品カテゴリーで最も多い2カテゴリーを抽出 */ /* なお同率の場合は、より直近のカテゴリーを優先 */ /* まずはカウント */ product_summary = product_summary.slice(0,5); const result = Object.values( product_summary.reduce((acc, {create_date, product_category}) => { if (!acc[product_category]) { acc[product_category] = {category: product_category, date: create_date, count: 0}; } acc[product_category].count += 1; return acc; }, {}) ); /* 上位2カテゴリーを抽出し、カテゴリー組み合わせがユニークになるよう昇順ソートしたうえで文字列結合して返す */ return result .sort((a, b) => b.count - a.count || new Date(b.date) - new Date(a.date)) .slice(0, 2) .sort((a, b) => { if (a.category > b.category) return 1; if (a.category < b.category) return -1; return 0; }) .map(a => a.category) .join('+'); """; -- 購入経路セグメント CREATE TEMP FUNCTION get_purchase_channel_segment( channel_summary ARRAY<STRUCT<create_date DATE, sales_channel STRING>> ) RETURNS STRING LANGUAGE js AS r""" /* 単純集計してECと店頭の比率のみでセグメント分けする */ /* まずはカウントして、比率を出しておく */ const counts = channel_summary.reduce((acc, record) => { const key = record.sales_channel; if (acc[key]) { acc[key] += 1; } else { acc[key] = 1; } return acc; }, {}); const total_count = Object.values(counts).reduce((sum, count) => sum + count, 0); const shop_ratio = counts['店頭'] / total_count; const ec_ratio = counts['EC'] / total_count; /* 比率に応じてセグメントを設定 */ if (shop_ratio == 1){ return '01_店頭のみ'; } else if (shop_ratio >= 0.7){ return '02_店頭中心'; } else if (ec_ratio == 1){ return '05_ECのみ'; } else if (ec_ratio >= 0.7){ return '04_EC中心'; } return '03_店頭・EC混在'; """; -- 値引き反応率セグメント CREATE TEMP FUNCTION get_discount_response_segment( one_year_discount_rate FLOAT64 ) RETURNS STRING LANGUAGE js AS r""" if (one_year_discount_rate > 0.1){ return 'D5_10%以上'; } else if (one_year_discount_rate > 0.05){ return 'D4_5%以上'; } else if (one_year_discount_rate > 0.03){ return 'D3_3%以上'; } else if (one_year_discount_rate > 0){ return 'D2_3%未満'; } return 'D1_値引きなし'; """; -- RFM計算 CREATE TEMP FUNCTION get_marketing_segment( frequency STRING, monetary STRING, one_year_discount_rate FLOAT64 ) RETURNS STRING LANGUAGE js AS r""" const f = frequency.slice(0,2); const m = monetary.slice(0,2); if (m == 'M1'){ return '01_プレミアム'; // 累計100万円以上 } else if (f == 'F1'){ return '02_ロイヤル'; // 毎月購入あり(≒累計10万円以上) } else if (f == 'F2' || f == 'F3'){ if (one_year_discount_rate >= 0.1){ return '05_ディールハンター'; } else if (m == 'M2'){ return '03_ライトロイヤル'; // 数か月に1回購入×累計10万円以上 } else { return '04_カジュアル'; // 数か月に1回購入×累計10万円未満 } } return '06_ノンアクティブ'; // 年に1回購入以下 """; CREATE OR REPLACE TABLE `sales_sample_output.customer` ( -- ID customer_id STRING OPTIONS(description='顧客ID'), -- 基本属性 gender STRING OPTIONS(description='性別'), age STRING OPTIONS(description='年代'), prefecture STRING OPTIONS(description='都道府県'), region STRING OPTIONS(description='地域'), -- 集計値 first_purchase_date DATE OPTIONS(description='初回購入日'), last_purchase_date DATE OPTIONS(description='最終購入日'), total_purchase_amount INT64 OPTIONS(description='累計購入金額'), total_purchase_count INT64 OPTIONS(description='累計購入回数'), total_purchase_discount INT64 OPTIONS(description='累計値引き額'), total_discount_rate FLOAT64 OPTIONS(description='累計平均値引き率'), one_year_purchase_amount INT64 OPTIONS(description='年あたり購入金額'), one_year_purchase_count INT64 OPTIONS(description='年あたり購入回数'), one_year_purchase_discount INT64 OPTIONS(description='年あたり値引き額'), one_year_discount_rate FLOAT64 OPTIONS(description='年あたり平均値引き率'), -- 嗜好性セグメント purchase_product_segment STRING OPTIONS(description='購入商品セグメント'), purchase_channel_segment STRING OPTIONS(description='購入経路セグメント'), discount_response_segment STRING OPTIONS(description='値引き感応度セグメント'), -- 購買セグメント(RFM) purchase_recency_segment STRING OPTIONS(description='R.最終購入日セグメント'), purchase_frequency_segment STRING OPTIONS(description='F.購入頻度セグメント'), purchase_monetary_segment STRING OPTIONS(description='M.購入金額セグメント'), -- 総合セグメント marketing_segment STRING OPTIONS(description='マーケティングセグメント'), ) OPTIONS ( description='顧客一覧' ) AS ( WITH temp AS ( SELECT -- ID t.customer_id, -- 基本属性 c.gender, get_age(c.date_of_birth, ${dataform.projectConfig.vars.YESTERDAY}) AS age, c.prefecture, c.region, -- 集計値 t.first_purchase_date, t.last_purchase_date, t.total_purchase_amount, t.total_purchase_count, t.total_purchase_discount, t.total_discount_rate, s.total_purchase_amount AS one_year_purchase_amount, s.total_purchase_count AS one_year_purchase_count, s.total_purchase_discount AS one_year_purchase_discount, s.total_discount_rate AS one_year_discount_rate, -- 嗜好性セグメント get_purchase_product_segment(d.product_summary) AS purchase_product_segment, get_purchase_channel_segment(d.channel_summary) AS purchase_channel_segment, get_discount_response_segment(s.total_discount_rate) AS discount_response_segment, -- 購買セグメント(RFM) get_purchase_recency_segment(t.last_purchase_date, ${dataform.projectConfig.vars.YESTERDAY}) AS purchase_recency_segment, get_purchase_frequency_segment(s.total_purchase_count) AS purchase_frequency_segment, get_purchase_monetary_segment(t.total_purchase_amount) AS purchase_monetary_segment FROM ${ref('customer_order_total')} t LEFT JOIN ${ref('customer_order_one_year_subset')} s ON t.customer_id = s.customer_id LEFT JOIN ${ref('customer_order_line_detail')} d ON t.customer_id = d.customer_id LEFT JOIN ${ref('ec_customer')} c ON t.customer_id = c.customer_id ) SELECT *, get_marketing_segment( purchase_frequency_segment, purchase_monetary_segment, one_year_discount_rate ) AS marketing_segment FROM temp ORDER BY customer_id ASC ); |
まず、JavaScript関数の定義の仕方を確認しましょう。もっともシンプルなget_age関数を見ると、引数であるdate_of_birthとyesterdayを型とセットで定義のうえ、戻り値をSTRINGにしてjs(JavaScript)で書いています。独特な感じはありますが、慣れればあとはJavaScriptを書くだけなので分かりやすいです。
1 2 3 4 5 6 7 8 9 10 11 |
CREATE TEMP FUNCTION get_age( date_of_birth DATE, yesterday DATE ) RETURNS STRING LANGUAGE js AS r""" const milli_seconds = 24 * 60 * 60 * 1000; const year_diff = Math.floor((yesterday - date_of_birth) / milli_seconds / 365); return Math.floor(year_diff / 10) + '0代'; """; |
実際に関数を使う場合も、以下のようによくある関数的な書き方で通ります。(コンパイル変数については前項を確認ください。)
1 2 3 4 5 6 |
SELECT … get_age(c.date_of_birth, ${dataform.projectConfig.vars.YESTERDAY}) AS age, … FROM … |
ちなみに、引数に配列を設定することももちろん可能です。ただし、配列を定義する書き方に癖があるので、丁寧めに確認していきましょう。まず関数の定義では、以下のように
ARRAY<STRUCT<{カラムをカンマ区切りでデータ型とともに並べる}>>
のように記述することで、いわゆるトランザクションの配列を引数にすることができます。以下の例では、product_summaryを配列として引数に設定しています。JavaScriptの中は通常のJavaScriptの書き方で大丈夫なので安心ですね。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
CREATE TEMP FUNCTION get_purchase_product_segment( product_summary ARRAY<STRUCT<create_date DATE, product_category STRING>> ) RETURNS STRING LANGUAGE js AS r""" /* 直近5個の購入商品カテゴリーで最も多い2カテゴリーを抽出 */ /* なお同率の場合は、より直近のカテゴリーを優先 */ /* まずはカウント */ product_summary = product_summary.slice(0,5); const result = Object.values( product_summary.reduce((acc, {create_date, product_category}) => { if (!acc[product_category]) { acc[product_category] = {category: product_category, date: create_date, count: 0}; } acc[product_category].count += 1; return acc; }, {}) ); /* 上位2カテゴリーを抽出し、カテゴリー組み合わせがユニークになるよう昇順ソートしたうえで文字列結合して返す */ return result .sort((a, b) => b.count - a.count || new Date(b.date) - new Date(a.date)) .slice(0, 2) .sort((a, b) => { if (a.category > b.category) return 1; if (a.category < b.category) return -1; return 0; }) .map(a => a.category) .join('+'); """; |
配列を引数にする場合は、実際に関数を使う場合の引数の作り方も特殊です。
今から確認する例では、customerテーブルを作る前にcustomer_order_line_detailというテーブルとして事前に作っているのですが、その中でARRAY_AGG関数とSTRUCT関数によってトランザクションデータを配列として1つのカラムproduct_summaryに集約しています。集約なので、GROUP BYを忘れずに書きます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
config { type: 'table', schema: 'sales_sample_output', name: 'customer_order_line_detail', description: '顧客注文明細(詳細)', tags: ['02_creating_from_inner_tables'] } SELECT customer_id, ARRAY_AGG(STRUCT(create_date,product_category) ORDER BY create_date DESC) AS product_summary, ARRAY_AGG(STRUCT(create_date,sales_channel) ORDER BY create_date DESC) AS channel_summary FROM ${ref('order_line')} WHERE customer_id IS NOT NULL GROUP BY customer_id |
ここまでできれば、実際に関数を使うときはシンプルです。
1 2 3 4 5 6 7 8 9 |
SELECT … get_purchase_product_segment(d.product_summary) AS purchase_product_segment, … FROM ${ref('customer_order_total')} t … LEFT JOIN ${ref('customer_order_line_detail')} d ON t.customer_id = d.customer_id |
この処理の基本を頭に入れてもらってから改めてcustomerテーブルのSQL全体を見ると、JavaScriptによるデータ加工の流れが見えてくると思います。
ひと工夫が必要ではありますが、トランザクションをもとに集約する場合や、FORループを使いたい場合、多段階の変数処理が入る場合などはJavaScriptによる実装の方が有利です。応用編として、ぜひ覚えておきましょう。
元のデータソースに立ち戻ります。今回はスプレッドシートをデータソースにしていますが、スプレッドシートを集計対象とするにはBigQueryの外部テーブルとして作っておく必要があります。もちろんBigQuery上の手作業で外部テーブルを作成することもできますが、再現性の観点ではDataform上で実装できていたほうがよいでしょう。
処理としては、CREATE OR REPLACE EXTERNAL TABLEのメソッドを使います。コツとしては、
を押さえておきたいところです。以下にec_customerテーブルのSQLを転載します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
config { type: 'operations', schema: 'sales_sample_sources', name: 'ec_customer', description: 'ECの顧客一覧(スプレッドシート参照)', hasOutput: true, tags: ['99_sources'] } CREATE OR REPLACE EXTERNAL TABLE `sales_sample_sources.ec_customer` ( customer_id STRING OPTIONS(description='顧客ID'), gender STRING OPTIONS(description='性別'), date_of_birth DATE OPTIONS(description='誕生日'), prefecture STRING OPTIONS(description='都道府県'), region STRING OPTIONS(description='地域') ) OPTIONS ( description = 'EC顧客一覧(スプレッドシート参照)', format = 'GOOGLE_SHEETS', uris = ['https://docs.google.com/spreadsheets/d/${dataform.projectConfig.vars.SOURCE_SPREADSHEET_ID}'], sheet_range = "'ec_customer'!A:E", skip_leading_rows = 1 ); |
BigQueryはデータ処理のエンジンが非常に強力なため、どんなにテーブルが大きくても数秒~数十秒くらいでSQLを実行しきってしまいます。しかし、従量課金のBigQueryはSELECTのためにスキャンした行と列のデータサイズに応じた課金が発生するため、むやみやたらと大きなテーブルを作って集計すると数十万~数百万円の課金が発生して「BigQueryで〇〇万円溶かした人」みたいな悲劇が発生しがちです。
大きくなりがちなテーブルの筆頭は時系列データをどんどん蓄積していくトランザクション系テーブルですが、スキャン量を減らすためにテーブルを分けるというのは本末転倒です。この問題に対してGoogleが提供している仕組みが、パーティション分割テーブルないしパーティショニングという仕組みです。
テーブルを日付カラムをもとに疑似的に分割し、SELECTの際にWHERE句でその日付カラムによって絞ることで、スキャン対象の行を絞ることができます。この仕組みを使うと、例えば直近7日間のデータのみを毎日再集計するような仕組みが可能のため、スキャンするデータサイズを桁違いに減らすことができます。
今まで登場したすべてのトランザクション系テーブルでしれっと設定していますが、実はその定義の方法が2パターンに分かれています。というのも、config.type:'table'の場合と、config.type:'operations'の場合で書き方が違うんです。
config設定にて定義できます。具体的には、config.bigquery.partitionByにて設定が可能です。以下の例は、shop_order_line_with_discountの抜粋です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
config { type: 'table', schema: 'sales_sample_output', name: 'shop_order_line_with_discount', description: '店頭注文明細(値引き情報再現)', columns: { receipt_id: 'レシートID', order_line_index: '明細番号', create_date: '日付', product_name: '商品名', product_id: '商品ID', product_category: '商品カテゴリ', quantity: '数量', line_price: '明細価格', line_discount: '明細値引き額', line_price_before_discount: '値引き前明細価格', customer_id: '顧客ID' }, bigquery: { partitionBy: 'create_date' }, tags: ['01_clensing_from_outer_tables'] } |
テーブル作成の際に、カラム定義を書いた後にPARTITION BY句を入れることで設定できます。以下の例はec_order_line_with_dateの抜粋です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
CREATE TABLE IF NOT EXISTS `sales_sample_output.ec_order_line_with_date` ( order_id STRING OPTIONS(description='注文ID'), order_line_index INT64 OPTIONS(description='明細番号'), create_date DATE OPTIONS(description='日付'), create_datetime DATETIME OPTIONS(description='日時'), customer_id STRING OPTIONS(description='顧客ID'), product_name STRING OPTIONS(description='商品名'), product_id STRING OPTIONS(description='商品ID'), product_category STRING OPTIONS(description='商品カテゴリ'), quantity INT64 OPTIONS(description='数量'), line_price INT64 OPTIONS(description='明細価格'), line_discount INT64 OPTIONS(description='明細値引き額'), line_price_before_discount INT64 OPTIONS(description='値引き前明細価格') ) PARTITION BY create_date OPTIONS ( description = 'EC注文明細(日付情報を補完)' ); |
Dataformでのデータパイプライン作成では、SQLを書くことと同じくらい、冒頭のconfigを適切に書くことが重要です。というのも、Dataformで作成させたSQLXファイルはこのconfigの情報のみを持って管理されるからです。
必須項目であるconfig.typeを除き、共通して設定しておくべき5つのパラメータ(schema, name, description, hasOutput, tags)について確認しましょう。
このSQLXファイルで参照されるテーブルがBigQuery上でどのデータセットに該当するかを定義するものです。デフォルトのデータセットは [workflow_settings.yaml] にて定義されますが、複数のデータセットにまたがる場合は明示する必要があります。結論、全てのSQLXファイルで明示的に書くことが推奨です。
というのも、config.schemaが制御する範囲が限定的となっているからです。具体的には、
という2点があります。細かいことを考えずに全てのSQLXファイルで明示的に書くことが、ルールとしても分かりやすいですし、configだけを見ておおよそファイルの理解ができて読み手にも優しいので推奨です。
SQLXのファイル名です。デフォルトはファイル名の拡張子を除いた部分になりますが、config.schemaと同じく全てのファイルで明示的に設定すること推奨です。というのも、configにデータセット名だけ書かれていてテーブル名がないのは、直感的に読みにくいからです。
レアケースに対する注意点として、config.type:'operations'のSQLXファイルで中間テーブルを含めて複数のテーブルを作成する場合、ref関数で他ファイルから参照したいテーブル名でconfig.nameを書いておく必要があります。
今回の例でも、⑤差分処理の実装(MERGE句)で紹介しているようなトランザクションテーブルでは累積テーブルと差分テーブルがありますが、config.nameで指定しているのはもちろん累積テーブルの方になります。
SQLXファイルの説明です。ここに説明を書けば、BigQuery上でテーブルの説明に反映されるので便利です。ぜひ設定しましょう。
なお、例に漏れずconfig.type:'operations'の場合はconfig.descriptionに書いてもBigQueryテーブルには反映されません。BigQueryテーブルの説明に無理やり反映するには、テーブル作成時のOPTIONSにdescriptionパラメータを設定する必要があります。
重複入力にはなりますが、読み手への優しさを考えれば共通して記述しておくべきでしょう。
ref関数の参照先にできるようにするか、COMPILED GRAPH(データリネージ)に登場させるかのパラメータです。通常はデフォルトでtrueですが、config.type:'operations'の場合のみデフォルトがfalseのため、注意が必要です。
他テーブルから参照しうる場合は、忘れずconfig.hasOut:true に設定しておきましょう。
実行時にまとめて実行する単位をタグとして設定可能です。実行タイミングの異なるSQLXファイルに名前を付けてタグとして設定しておきましょう。
長くなりましたが、Dataformのデータ処理の基本の型として①~⑩の10点をまとめました。これらを押さえておけば、とっかかりとしては十分ではないかな…と思います。
なお、今回のデータソースのサンプルはある意味現実的でだいぶ汚いため、いくつかまどろっこしいデータの加工が必要になっています。SQLを細かく読んだ人が「なぜこんなことを…?」と思いがちなポイントを4つほど挙げておきます。
データ処理の実務は思わぬところで躓きがちで、大変ですね。
この記事がいつかどこかの誰かの型の助けになれば幸いです。
最後に、本記事でも利用しているGitHubプロジェクトのコード同期についても書いておきます。
Dataformは環境の区別や本番環境へのコミットの概念はあるものの、バージョン管理ツールとしてできることは限定的です。一方で、GitHubとの連携機能を持っており、連携することでGitHubにて本格的なコードのバージョン管理が可能になるため、チームでワークフローを管理したい方にとってはオススメになります。
Dataform公式ドキュメントを参考に、以下の手順で進めました。