こんにちは。那須野です。
ChatGPTは順調に進化してきて、いろいろなデータ分析に使えるようになってきていますよね。
前回はBigQueryのデータにアクセスしてデータ分析、つまりreadをさせてみましたが、今回はデータの更新、つまりwriteもさせてみようと思います。
readを比べるとwriteはリスクが高めで、思わぬテーブルやレコードをdeleteされたりupdateされたりということが怖いので、権限を細かめに設定することの重要性が増してきます。
そのあたりを頭に入れておきつつ、実際に試してみましょう。
※この記事は、前回の内容を取り組んだ方向けの内容になっています。アーキテクチャや手順など詳細は前回の方に書いてあるので、まだ見ていない方は先に読んで適宜取り組んでから以下をお読みください。
ChatGPTによるBigQueryデータ更新とは、ChatGPT画面で自然言語(日本語)でチャットで指示して、ChatGPTにBigQueryのデータを更新させていくことを指します。例えば以下のようなイメージです。
「売上テーブルを以下のロジックで集計して、会員テーブルのrankを更新して。」
「BIツールで参照しやすいようにGA4、サーチコンソール、Google広告の実績データを日次で集計してdigital_marketing_daily_indexテーブルとして保存して。」
「都道府県ごとの今日の天気予報を収集してweather_forecastテーブルにinsertして。」
いろいろできそうですよね。
BigQueryデータ更新で何よりも重要なのは、プロセスを保存→検証して品質保証できるようになる点でしょう。
というのも、ChatGPTの前回の分析の欠点は、分析した結果がChatGPTの個人チャット上にしかない点です。結果として、裏でどういう分析をしたのか分かりづらく、スレッドが流れてしまうと本人も思い出せなくなります。加えて、分析プロセスを第三者チェックするのが極めて大変です。つまるところ、ビジネスにおいて死活問題となる「組織的なデータ分析運用」に対してサポートがほとんどなされないという点です。
この点について、BigQueryを更新できるようにすることで様々な改善が見えてきます。例を挙げてみましょう。
それ以外にも、アクション経由では授受できないような大きなアウトプットをBigQueryに保存することで、多段階の分析をより安全・確実に実行できる点も見逃せないですね。
しかし、生成AIは人間と同じようにミスをしうるものです。BigQueryの閲覧だけであればリスクは少ないですが、編集の権限を与えた場合はトンデモナイことをやらかす可能性を否定できなくなります。例えばAIがうっかりデータセットごとDROPするような展開…を考慮しないといけないのです。
そのため、駆け出しエンジニアが初めての手作業をするときにベテランエンジニアが目を凝らして見守っていたのと同じように、ChatGPTを見守る仕組みが必要です。具体的には、
の4点を抑えておく必要があります。いくつか方法はあるのですが、今回は簡易的かつ効果的な方法としてサービスアカウントを3つ立てる方法を取ってみようと思います。エンドポイント実行用のrunner、BigQuery閲覧用のreader、BigQuery更新用のwriterで、それぞれに適切な権限を付与する形です。
これにより、閲覧系と更新系がサービスアカウントレベルで分離するため、それぞれの系統に対してBigQuery権限を細かに付与すること・・・例えば「このテーブルは閲覧のみ、このテーブルは更新もできる」などの設定が容易になり、実行履歴で明快に区別できるようになり、またコードに触らずとも編集系だけを即座に切るなどの対応も可能になります。
また、何かのはずみで編集権限が変なところについて変に更新できてしまうと困るので、プロンプトレベルで制御するだけでなく、サービスアカウントのデータ編集権限による制御も加えて二重のロックをかけることで、うっかりミスを極力減らすことができるはずです。
なお、ここまでやってもモノによっては復旧が困難になる可能性があるため、やはりBigQueryのタイムトラベル機能の有効化したうえで復元訓練を積んでおくまでは必須でしょう。
それでは次は、設定手順を具体的に見ていきます。
API有効化や作業ユーザーのロール設定は前回と同様なので割愛します。
今回はReadもWriteもできるということで、rwをキーワードとして足した名前で作っていこうと思います。それでは具体的な手順を見ていきましょう。
サービスアカウントの画面を開き、サービスアカウントを3つ作成してそれぞれロールを付与します。また、Read用、Write用についてはBigQueryスタジオ画面から対象となるデータセットの権限を付与します。
| Run用 | Read用 | Write用 | |
|---|---|---|---|
| 名前 | chatgpt-bq-runner@以下略 | chatgpt-bg-reader@以下略 | chatgpt-bg-writer@以下略 |
| ロール |
|
BigQuery ジョブユーザー | BigQuery ジョブユーザー |
| BigQuery権限 | なし | 対象データ全てについて、BigQueryスタジオの「共有」の権限管理にて「BigQuery 閲覧者」を付与する。 | 対象データのうち、更新してよいデータのみについて、BigQueryスタジオの「共有」の権限管理にて「BigQuery 閲覧者」と「BigQuery 編集者」を付与する。 |
| 他設定 | ※「アクセス権を持つプリンシパル」の設定を開き、Run用のサービスアカウントがアクセスできることを確認しておく。 | ※「アクセス権を持つプリンシパル」の設定を開き、Run用のサービスアカウントがアクセスできることを確認しておく。 | |
| 備考 | ※これが前回作ったものと同等のサービスアカウント。 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
import json import os import traceback import functions_framework from google.cloud import bigquery from google.auth import default, impersonated_credentials API_KEY = os.getenv("EXPECTED_API_KEY") READER_SA = os.getenv("READER_SA") WRITER_SA = os.getenv("WRITER_SA") BQ_LOCATION = os.getenv("BQ_LOCATION") def _impersonated_client(target_principal: str) -> bigquery.Client: if not target_principal: raise RuntimeError("Service Account for impersonation is not set.") source_creds, _ = default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) creds = impersonated_credentials.Credentials( source_credentials=source_creds, target_principal=target_principal, target_scopes=["https://www.googleapis.com/auth/bigquery"], lifetime=300, ) return bigquery.Client(credentials=creds, location=BQ_LOCATION) def get_bq_client(mode: str) -> bigquery.Client: m = (mode or "read").lower() if m == "write": return _impersonated_client(WRITER_SA) else: return _impersonated_client(READER_SA) @functions_framework.http def run_bq(request): if request.headers.get("x-api-key") != API_KEY: return ("Unauthorized", 401) try: body = request.get_json(silent=True) or {} mode = (body.get("mode") or "read").lower() # "read" or "write" sql = body.get("query") if not sql: return (json.dumps({"error": "'query' is required"}), 400, {"Content-Type": "application/json"}) client = get_bq_client(mode) job = client.query(sql) if mode == "read": try: import pandas as pd df = job.result().to_dataframe() df = df.where(df.notna(), None) # イレギュラーなNaNだけNone(null)に補正 df_json = df.to_json(orient="records", force_ascii=False) df_json_len = len(df_json) if df_json_len > MAX_JSON_CHARACTERS: payload = { "error": "response_records_exceeds_size_limit", "message": "responseのレコードサイズがChatGPTの受け取り上限を超えました。列を減らす、行を絞り込む、またはLIMITを設定してください。", "stats": { "rows": int(df.shape[0]), "cols": int(df.shape[1]), "characters": df_json_len, "max_characters": MAX_JSON_CHARACTERS } } return (json.dumps(payload, ensure_ascii=False), 413, {"Content-Type": "application/json"}) return (df_json, 200, {"Content-Type": "application/json"}) except Exception: rows = [dict(row) for row in job.result()] return (json.dumps(rows, ensure_ascii=False), 200, {"Content-Type":"application/json"}) else: # DML 実行結果を返す return ( json.dumps({ "job_id": job.job_id, "statement_type": getattr(job, "statement_type", None), "num_dml_affected_rows": job.num_dml_affected_rows, }), 200, {"Content-Type": "application/json"}, ) except Exception as e: # Cloud Loggingで確認できるようトレースバックを出力 print("Error during BigQuery execution:", str(e)) traceback.print_exc() return (json.dumps({"error": str(e)}), 500, {"Content-Type": "application/json"}) |
|
1 2 3 4 |
google-cloud-bigquery>=3.20.0 google-auth>=2.20.0 pandas>=2.2.0 db-dtypes>=1.0.0 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
{ "openapi": "3.1.0", "info": { "title": "BigQuery Runner", "version": "1.2.0" }, "servers": [ { "url": "https://chatgpt-bq-rw-connector-<hash>.asia-northeast1.run.app" } ], "paths": { "/read": { "post": { "operationId": "bq_read", "summary": "Run a read-only SQL query (SELECT, etc.)", "x-openai-isConsequential": false, "requestBody": { "required": true, "content": { "application/json": { "schema": { "type": "object", "properties": { "mode": { "type": "string", "enum": ["read"], "description": "必ず read を指定" }, "query": { "type": "string", "description": "Standard SQL (SELECT ...)" } }, "required": ["mode", "query"] } } } }, "responses": { "200": { "description": "Rows as JSON", "content": { "application/json": { "schema": { "type": "array", "items": { "type": "object" } } } } }, "400": { "description": "Bad request (e.g., missing 'query')", "content": { "application/json": { "schema": { "type": "object", "properties": { "error": { "type": "string" } } } } } }, "401": { "description": "Unauthorized (invalid or missing API key)" }, "413": { "description": "Payload too large (response exceeds MAX_JSON_CHARACTERS)", "content": { "application/json": { "schema": { "type": "object", "properties": { "error": { "type": "string" }, "message": { "type": "string" }, "stats": { "type": "object", "properties": { "rows": { "type": "integer" }, "cols": { "type": "integer" }, "characters": { "type": "integer" }, "max_characters": { "type": "integer" } } } }, "required": ["error", "message", "stats"] } } } }, "500": { "description": "Internal server error during BigQuery execution", "content": { "application/json": { "schema": { "type": "object", "properties": { "error": { "type": "string" } } } } } } } } }, "/write": { "post": { "operationId": "bq_write", "summary": "Run a write (DML/DDL) statement", "x-openai-isConsequential": false, "requestBody": { "required": true, "content": { "application/json": { "schema": { "type": "object", "properties": { "mode": { "type": "string", "enum": ["write"], "description": "必ず write を指定" }, "query": { "type": "string", "description": "DML/DDL statement" } }, "required": ["mode", "query"] } } } }, "responses": { "200": { "description": "Job metadata", "content": { "application/json": { "schema": { "type": "object", "properties": { "job_id": { "type": "string" }, "statement_type": { "type": ["string", "null"] }, "num_dml_affected_rows": { "type": ["integer", "null"] } } } } } }, "400": { "description": "Bad request (e.g., missing 'query')", "content": { "application/json": { "schema": { "type": "object", "properties": { "error": { "type": "string" } } } } } }, "401": { "description": "Unauthorized (invalid or missing API key)" }, "500": { "description": "Internal server error during BigQuery execution", "content": { "application/json": { "schema": { "type": "object", "properties": { "error": { "type": "string" } } } } } } } } } } } |
これら手順を踏むことで、readだけでなくwriteもできるカスタムGPTができあがり、プロンプトによる指示をしてより柔軟な分析をさせることができるでしょう。
これだけで結構実用的な分析が可能になる土台ができますが、実際のところは指示次第、プロンプト次第となります。
例えば組織的にログを取得していく上では、カスタムGPTレイヤーの指示によってログ記録を義務付けてもいいですし、場合によってはCloud Runの関数レイヤーでログ記録を実装することも可能です。
また、BigQuery上においては分析トランザクションの管理がより重要になっていくでしょう。例えばChatGPTの分析指示がいつ誰からなされ、分析Noとして何番が宛がわれ、どのユーザー指示を受けてどんなSQLをAIが書いたのかがBigQueryに保存され、それを別のエージェントが読み取って分析意図を読み解いてレビュー結果としてBigQueryに登録し、そのレビュー結果をまた別のエージェントがユーザー指示と照合して評価をし、及第点であればSQLとして実行されて集計結果が返ってきて、集計結果の解釈がBigQueryに保存されて、といったサイクルをユーザーに意識させずに回そうとしたとき、カスタムGPTやCloud Run関数、はたまたそれ以外のリソースをどう設計すべきなのか…
とりあえず全てをカスタムGPTレイヤーでどうにかしようとしたら、例えば以下のような指示が考えられます。書き方の工夫としては、以下を気を付けるとよいでしょう。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
あなたは以下のチームメンバーからなる分析チームです。以下のBigQuery環境に格納されているデータに対して、ユーザーの依頼を達成する分析を、以下の業務フローを遵守し、数値的なファクトの提示を重視して遂行してください。なお、ユーザーの確認は一切取らず,最後まで進めてください。 # チームメンバー チームメンバーは、以下の2名です。それぞれは記憶を共有せず、個別に行動します。 - アナリスト: ユーザーの依頼を受けて分析を設計→実行→報告するのが得意です。 - レビュワー: 分析を客観的に読みといて評価するのが得意です。 # 業務フロー以下の工程ごとに推進します。履歴保存はanalysis_logテーブルにします。 - 行程1.分析開始: アナリストが、analysis_logテーブルのMAX(analysis_index)+1を確認し、今回のanalysis_indexとして採番します。 - 行程2.分析モードの確認: 分析モードをREAD_ONLYモード(分析SQLはSELECTのみ)で続行するか、WRITEモード(分析SQLでは「temp_{分析ID}_{分析ステップID}_{テーブルdescription}」の命名規則に従ってテーブル名を作成してcreated_tableテーブルに記録したうえでCREATEする)に切り替えて分析するかをユーザーに尋ねます。 - 行程3.分析計画: アナリストが、ユーザーの依頼を構造的に整理し、どういった分析を行えばよいかの分析計画を提示します。分析が多段階にわたる場合は、それぞれを分析ステップとして設定します。 - 行程4.分析計画の保存: 分析計画の全文を履歴保存し、今回のanalysis_indexのレコード数を返します。 - 行程5.SQL作成: アナリストが、分析計画をもとに分析SQLを作成します。 - 行程6.SQLレビュー: レビュワーがSQLを読み解いて説明を行い、分析計画にマッチしているかどうかをフィードバックします。もしマッチしない場合は、行程5に戻ります。 - 行程7.SQLの保存: 実際のSQL全文を履歴保存し、今回のanalysis_indexのレコード数を返します。 - 行程8.分析: アナリストが分析SQLを実行し、その実行結果を読み解き、データの解釈を行います。次の分析がある場合、行程5に戻って分析を繰り返します。 - 行程9:分析レポート作成: アナリストが、ここまでの分析をまとめ、分析レポートを作成します。 - 行程10.分析レポートレビュー: レビュワーが、分析レポートをユーザーの依頼に照らし合わせて評価します。改善点がある場合、必要に応じて行程3に戻って分析を繰り返します。 - 行程11.分析の保存: 分析レポート全文を履歴保存し、今回のanalysis_indexのレコード数を返します。 - 行程12.最終報告: アナリストが、ユーザーに対して最終報告を行います。なお、端的な分かりやすさを重視しつつも、数値的なファクトの提示により説得感を強化します。 # BigQuery環境 ## データセット定義 region: asia-northeast1 project: {project_id} dataset: - {read_dataset}: データ分析対象データが格納されているデータセット。読み取り専用。 - {write_dataset}: 作業履歴の記録用データセット。読み書き可。 ## テーブル定義 ### {update_dataset}.analysis_log description: データ分析履歴 columns: - analysis_index: - description: 分析ID(今回の分析開始時にMAX(analysis_index)+1を採番し、同じ分析プロジェクトでは同じ番号を使い続ける) - data_type: INTEGER - sample: 1 - analysis_step_index: - description: 分析ステップID(何番目の分析ステップかを示す。分析が段階ではない場合、常に1となる) - data_type: INTEGER - sample: 1 - analysis_stage_index: - description: 分析行程ID(何番目の分析行程かを示す) - data_type: INTEGER - sample: 1 - analysis_stage_name: - description: 分析行程名 - data_type: STRING - sample: 要求定義 - record: - description: 履歴(この工程で出力したチャット全文やSQL全文を文字列で記録する) - data_type: STRING - sample: 分かりました!これから実際の分析に入ります。まず…(以下略) - created_at: - description: 作成時刻 - data_type: TIMESTAMP - sample: 2025-08-31 15:30:45 UTC ### {update_dataset}.created_table description: 作成テーブル columns: - table_name: - description: テーブル名 - data_type: STRING - sample: 1_1_4_rfm_segment - analysis_index: - description: テーブル作成時の分析ID - data_type: INTEGER - sample: 1 - analysis_step_index: - description: テーブル作成時の分析ステップID - data_type: INTEGER - sample: 1 - analysis_stage_index: - description: テーブル作成時の分析行程ID - data_type: INTEGER - sample: 1 - analysis_stage_name: - description: テーブル作成時の分析行程名 - data_type: STRING - sample: 要求定義 - created_at: - description: 作成時刻 - data_type: TIMESTAMP - sample: 2025-08-31 15:30:45 UTC ### {read_dataset}.{table_name} ### {read_dataset}.{table_name} ### {read_dataset}.{table_name} ※必要なテーブル分だけ書く |
なかなか長いですが、これぐらい書けば割と汎用的に機能しそうですね。
ということで、BigQueryを更新しながらのデータ分析についてまとめてみました。今回の制御はほぼ全てをフロントエンド(ChatGPTのプロンプト側)に寄せた実装なので、作るのは比較的簡単でしたね。
一方で、確実性やセキュリティを高めるには制御をバックエンド側(Cloud RunのPython側)に寄せる実装というのも選択肢に入ってくるでしょう。
どちらにしろ、こういうのができるようになってくるとワークフローの言語化スキルがより重要になってくるように感じますね。
以上、本日の備忘録でした。いつかどこかの誰かの参考になれば幸いです。
