SupabaseとFastAPIの非同期接続に関する備忘録

supabase and fastapi Python

こちらの記事でBackendアプリの構築にSupabaseを利用する際の備忘録。

ChatGPTを活用したAIチャットアプリの開発 - Backend開発編
前回、ChatGPTと対話を繰り返して、AIチャットアプリの要件整理を行いました。 使用するテックスタックの整理、簡易システム構成図の作成、API設計、テーブル設計。 この記事では、実際にbackend側のコードをChatGPTに生成してもらいます。

Python FastAPIを用いてSupabseに接続する際の設定を記事にしています。

Python>=3.12。またFastAPIでSupabaseを使うには、以下あたりのパッケージが必要。

# requirements.txt
fastapi==0.110.0
uvicorn==0.29.0
SQLAlchemy==2.0.29
asyncpg==0.29.0
psycopg2==2.9.10
supabase==2.15.0

Supabase URL Setting

Supabaseには以下のようなconnection URLが用意されている。

# Supabase connection
Direct connection:
postgresql://postgres:[YOUR-PASSWORD]@db.[My-Project].supabase.co:5432/postgres

Transaction pooler:
postgresql://postgres.[My-Project]:[YOUR-PASSWORD]@aws-0-ap-northeast-1.pooler.supabase.com:6543/postgres

Session pooler:
postgresql://postgres.[My-Project]:[YOUR-PASSWORD]@aws-0-ap-northeast-1.pooler.supabase.com:5432/postgres

Python .env

# Transaction pooler (port=6543)
DATABASE_URL="postgresql+asyncpg://postgres.[My-Project]:[YOUR-PASSWORD]@aws-0-ap-northeast-1.pooler.supabase.com:6543/postgres"

+asyncpg を追加している。これがないと非同期接続処理がうまくいかない。

port=6543としても接続することはできるが次のようなエラーが出る可能性が高い。

'asyncpg.exceptions.DuplicatePreparedStatementError'>: prepared statement "__asyncpg_stmt_6__" already exists

HINT:
NOTE: pgbouncer with pool_mode set to "transaction" or
"statement" does not support prepared statements properly.

You have two options:
* if you are using pgbouncer for connection pooling to a
  single server, switch to the connection pool functionality
  provided by asyncpg, it is a much better option for this
  purpose;

* if you have no option of avoiding the use of pgbouncer,
  then you can set statement_cache_size to 0 when creating
  the asyncpg connection object.

[SQL: SELECT "user".id, "user".email, "user".name, "user".password, "user".preferred_language, "user".created_at
FROM "user"
WHERE "user".id = $1::UUID]
[parameters: ('19186bee-f422-4efa-8f29-dd06528d0925',)]
(Background on this error at: <https://sqlalche.me/e/20/f405>)
2025-04-22 15:18:25,056 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-04-22 15:18:25,057 INFO sqlalchemy.engine.Engine SELECT "user".id, "user".email, "user".name, "user".password, "user".preferred_language, "user".created_at
FROM "user"
WHERE "user".id = $1::UUID
2025-04-22 15:18:25,058 INFO sqlalchemy.engine.Engine [cached since 0.1802s ago] ('19186bee-f422-4efa-8f29-dd06528d0925',)
2025-04-22 15:18:25,164 INFO sqlalchemy.engine.Engine SELECT conversation.id, conversation.user_id, conversation.title, conversation.created_at
FROM conversation
WHERE conversation.user_id = $1::UUID ORDER BY conversation.created_at DESC
2025-04-22 15:18:25,165 INFO sqlalchemy.engine.Engine [generated in 0.00050s] (UUID('19186bee-f422-4efa-8f29-dd06528d0925'),)      
2025-04-22 15:18:25,232 INFO sqlalchemy.engine.Engine ROLLBACK
INFO:     127.0.0.1:61405 - "GET /user-conversations?user_id=19186bee-f422-4efa-8f29-dd06528d0925 HTTP/1.1" 500 Internal Server Error

Transaction pooler についてGeminiに聞いたときの解説と提案

  • Transaction Pooler (ポート 6543): クエリの実行中のみ接続を保持し、トランザクション終了後に接続をプールに戻します。プリペアドステートメントとの相性が悪く、今回のエラーメッセージで指摘されているモードです。
  • Session Pooler (ポート 5432): クライアントセッションの間、専用の接続を維持します。プリペアドステートメントを適切にサポートします。
  • Direct Connection (ポート 5432): プールを経由せず、データベースに直接接続します。

現在の状況と取るべき対策:

エラーメッセージが pgbouncer (Supabase のプール基盤) の設定を示唆している以上、最も可能性の高い解決策は、Transaction Pooler (ポート 6543) ではなく、Session Pooler (ポート 5432) を使用するように接続URLを変更することです。

そのためURLの設定を以下に変える。

# Session pooler(port=5432)
DATABASE_URL="postgresql+asyncpg://postgres.[My-Project]:[YOUR-PASSWORD]@aws-0-ap-northeast-1.pooler.supabase.com:5432/postgres"

正常時のログ

(.venv) PS D:\\Github\\PythonWorks\\AISpeak> uvicorn app.main:app --reload --port 8000 --log-level debug

INFO:     Will watch for changes in these directories: ['D:\\\\Github\\\\PythonWorks\\\\AISpeak']
INFO:     Uvicorn running on <http://127.0.0.1:8000> (Press CTRL+C to quit)
INFO:     Started reloader process [21336] using StatReload
INFO:     Started server process [10456]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     127.0.0.1:63332 - "OPTIONS /user-conversations?user_id=19186bee-f422-4efa-8f29-dd06528d0925 HTTP/1.1" 200 OK

2025-04-22 16:03:20,714 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2025-04-22 16:03:20,715 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-04-22 16:03:20,756 INFO sqlalchemy.engine.Engine select current_schema()
2025-04-22 16:03:20,756 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-04-22 16:03:20,819 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2025-04-22 16:03:20,822 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-04-22 16:03:20,860 INFO sqlalchemy.engine.Engine BEGIN (implicit)

2025-04-22 16:03:20,870 INFO sqlalchemy.engine.Engine SELECT "user".id, "user".email, "user".name, "user".password, "user".preferred_language, "user".created_at
FROM "user"
WHERE "user".id = $1::UUID
2025-04-22 16:03:20,870 INFO sqlalchemy.engine.Engine [generated in 0.00044s] ('19186bee-f422-4efa-8f29-dd06528d0925',)
2025-04-22 16:03:20,911 INFO sqlalchemy.engine.Engine SELECT conversation.id, conversation.user_id, conversation.title, conversation.created_at
FROM conversation
WHERE conversation.user_id = $1::UUID ORDER BY conversation.created_at DESC

非同期セッション

create_async_engine を使う。

また以下を引数に指定する。

connect_args={"statement_cache_size": 0}

以下のようなファイルを1つ作成して、get_dbを共通で使うようにする。

# app/db.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from app.config import DATABASE_URL

engine = create_async_engine(DATABASE_URL, echo=True, connect_args={"statement_cache_size": 0})

async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()

async def get_db():
    async with async_session() as session:
        try:
            yield session
        except Exception as e:
            await session.rollback()
            raise HTTPException(status_code=500, detail=str(e))
        finally:
            await session.close()

DATABASE_URLはconfig.pyから読み取っている。

# app/config.py
import os
import socket

from dotenv import load_dotenv

load_dotenv()
socket.getaddrinfo('localhost', 8080)
DATABASE_URL = os.getenv("DATABASE_URL")