Chanomic Blog

FastAPIとOAuth2でユーザログイン機能(備忘録)

(last modified:
)

categories:

何をするか

以下の3つの機能を実装する。

方針

使う技術・フレームワーク、ライブラリなど

プロジェクト構成

プロジェクトディレクトリは次のようにする。

.
├── api
│   ├── cruds
│   │   ├── __init__.py
│   │   └── user.py
│   ├── models
│   │   ├── __init__.py
│   │   └── user.py
│   ├── routers
│   │   ├── __init__.py
│   │   └── auth.py
│   ├── schemas
│   │   ├── __init__.py
│   │   └── user.py
│   ├── db.py
│   ├── main.py
│   └── migrate_db.py
├── Dockerfile
├── docker-compose.yaml
├── poetry.lock
└── pyproject.toml

ちなみにこの図はtree -I __pycache__ --dirsfirsで生成。

サービス名は以下の通り。

開発環境作成

最低限の環境を作る。本質的なところではないのであまり解説はしない。

docker-compose.yamlは次のようにする。

version: '3'
services:
  app:
    build: .
    volumes:
      - .:/src
    ports:
      - 8000:8000

  db:
    image: mysql:8
    environment:
      MYSQL_ALLOW_EMPTY_PASSWORD: 'yes'
      MYSQL_DATABASE: 'app-db'
      TZ: 'Asia/Tokyo'
    volumes:
      - mysql_data:/var/lib/mysq
    command: --default-authentication-plugin=mysql_native_password

volumes:
  mysql_data:

Dockerfileは次のようにする。

FROM python:3.11-buster
ENV PYTHONUNBUFFERED=1

WORKDIR /src

RUN pip install poetry
RUN poetry config virtualenvs.in-project true

ENTRYPOINT ["poetry", "run", "uvicorn", "api.main:app", "--host", "0.0.0.0", "--reload"]

ちなみに、virtualenvs.in-project tureをやっておかないと、プロジェクトディレクトリに各種ライブラリが展開されないので注意。

以下の3つのコマンドを実行する。この時点で必要なパッケージを--dependencyで指定している。

$ docker-compose build
$ docker-compose run --entrypoint=poetry app init --name app \
    --dependency fastapi \
    --dependency uvicorn[standard] \
    --dependency alembic \
    --dependency aiomysql \
$ docker-compose run --entrypoint=poetry app install --no-root

簡単にapi/main.pyを作成しておく。

from fastapi import FastAPI

app = FastAPI()

@app.get('/api/hello')
def hello():
    return 'Hello'

最後にdocker-compose upすれば、FastAPI(on uvicorn)とMySQLのサーバーが起動する。

マイグレーションスクリプト作成

alembicを初期化。

$ docker-compose run --entrypoint=poetry app run alembic init alembic

alembic.iniにMySQLコンテナへのアドレスを指定する。ついでにここにasync版のURLを書いておく。

sqlalchemy.url = mysql+pymysql://root@db:3306/app-db?charset=utf8
sqlalchemy.async_url = mysql+aiomysql://root@db:3306/app-db?charset=utf8

DBとの疎通確認も兼ねて、空のマイグレーションをしておく。

$ docker-compose run --entrypoint=poetry app run alembic revision --autogenerate -m "empty migration"
$ docker-compose run --entrypoint=poetry app run alembic upgrade head

ユーザのテーブル作成

今回はメールアドレスとパスワードで認証するような仕組みを作るので、ユーザのエンティティにはその2つを持たせる。

まずapi/db.pyを編集する。

from sqlalchemy.orm import declarative_base

Base = declarative_base()

api/models/user.pyでユーザのエンティティを作成。

from sqlalchemy import Column, Integer, String
from api.db import Base

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    email = Column(String(256), unique=True)
    password = Column(String(256))

alembic/env.pymetadataを指定。

from api.db import Base
from api.models import user

target_metadata = Base.metadata

マイグレーションをして、ユーザテーブルを作成。

$ docker-compose run --entrypoint=poetry app run alembic revision --autogenerate -m "user"
$ docker-compose run --entrypoint=poetry app run alembic upgrade head

DBのセッション作成

後々ユーザ登録とかログインで使うので、DBのセッションを作っておく。今回はDBのasync版のURLがalembic.iniに書かれているため、それを使わせてもらう。

api/db.pyに追記。

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

DB_URL = 'mysql+aiomysql://root@db:3306/app-db?charset=utf8'

async_engine = create_async_engine(DB_URL, echo=True)
async_session = sessionmaker(
        autocommit=False,
        autoflush=False,
        bind=async_engine,
        class_=AsyncSession
)

async def get_db():
    async with async_session() as session:
        yield session

ユーザ登録処理を作成

やることは、パスワードをハッシュ化してDBに保存すること。

  1. メールとパスワードをスキーマの形で受け取る。
  2. パスワードハッシュ化する。
  3. DBに追加する。すでに存在するメールアドレスがあるなら例外を投げるはずなので、ちゃんとキャッチする。
  4. ユーザ情報を返す。

ハッシュのためにpasslibが必要なので、このタイミングで入れておく。

docker-compose run --entrypoint=poetry app add passlib[bcrypt]

準備

api/routers/user.pyでひな形を作っておく。

from fastapi import APIRouter

from api.schema import user as user_schema

router = APIRouter()

@router.post('/api/register', response_model=user_schema.User)
async def register(user_create: user_schema.UserCreate):
    pass

リクエストとレスポンスのスキーマが必要なのでそれをつくる。api/schema/user.pyを編集する。

from pydantic import BaseModel


class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str


class User(UserBase):
    id: int

    class Config:
        orm_mode=True

api/main.pyにルーターの記述を追加する。

from api.routers import user

app.include_router(user.router)

この時点でSwagger UIを開いて、/api/registerのエンドポイントが開かれていればOK。

中身の作成

api/routers/user.pyを編集する。UNIQUE制約に関する例外をキャッチする事情で、少し煩雑にはなっている。しかし、それ以外のところを見てみると、単にユーザを作って返しているだけだと分かる。 ちなみに、api.schema.userUserクラスでorm_mode=Trueにしているおかげで、api.models.user.Userが自動的にapi.schema.user.Userに変換されて出力される。

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
from pymysql.err import IntegrityError
from pymysql.constants import ER

from api.db import get_db
from api.schema import user as user_schema
from api.cruds import user as user_cruds

@router.post('/api/register', response_model=user_schema.User)
async def register(user_create: user_schema.UserCreate, db: Annotated[AsyncSession, Depends(get_db)]):
    try:
        user = await user_cruds.register_user(db, user_create)
        return user
    except IntegrityError as e:
        errcode, _ = e.args
        if errcode == ER.DUP_ENTRY:
            raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail='already used email address'
                  )
        else:
            raise

api/cruds/user.pyを編集する。

パスワードをハッシュ化してDBに登録する。例外はSQLAlchemyがラップして返すので、それを引きはがして呼び出し元に任せる。

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from passlib.context import CryptContext
from pymysql.constants import ER

from api.models import user as user_model
from api.schema import user as user_schema


pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')


async def register_user(
    db: AsyncSession,
    user_create: user_schema.UserCreate
) -> user_model.User: 

    try:
        user = user_model.User(**user_create.dict())
        user.password = pwd_context.hash(user.password)

        db.add(user)
        await db.commit()
        await db.refresh(user)

        return user
    except IntegrityError as e:
        db.rollback()
        raise e.orig

トークン取得処理を作成

やることは、

  1. ROPCのフォーム形式に従い、usernamepasswordを取得。
  2. usernameに合致するユーザをDBから検索する。OAuth2の使用上usernameという名前で受け取るが、ここではemailと同じ意味。
  3. ユーザが見つかったらパスワードを照合。
  4. 2、3が成功したらトークンを生成して返す。

の4つ。それぞれについての実装方法は、

  1. fastapi.securityモジュールでOAuth2PasswordReqeustFormが提供されているのでそれを使う。
  2. api.crudsモジュールにget_user_by_emailという関数を作成し、そこでDBとの通信を行う。
  3. passlibCryptContext.verify関数を使う。
  4. ユーザを判別できるようなサブ情報(ここではemail)とトークンの有効期限を入れた辞書を作り、JWTにする。JWTの変換はjoseモジュールのjwt.encodeを用いる。

となる。なお、jwt.encodeの暗号化アルゴリズムとしてHS256を用いる。そのための鍵(シークレットキー)をあらかじめ生成しておく。

このタイミングでjoseを入れておく。また、OAuth2PasswordReqeustFormの利用のためにはmultipartも必要なため、それも入れる。

docker-compose run --entrypoint=poetry app add python-jose[cryptgraphy] python-multipart

準備

ひな形を作っておく。

api/routers/user.pyに追記する。

from api.schema import token as token_schema
from fastapi.security import OAuth2PasswordRequestForm

@router.post('/api/token', response_model=token_schema.Token)
async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
    pass

api/schema/token.pyを作成。

from pydantic import BaseModel

class Token(BaseModel):
    access_token: str
    token_type: str

この時点でSwagger UIを開いて、/api/tokenのエンドポイントが開かれていればOK。

中身の作成

api/routers/user.pyを編集。ここでは、

  1. 受け取ったusernamepasswordをそれぞれemailpasswordとみなして、合致するユーザを取得する。
  2. トークンを作成して返す。

という処理を行っている。それぞれについての細かい処理は別の関数にまとめる。

from datetime import datetime, timedelta

ACCESS_TOKEN_EXPIRE_MINUTES = 30

@router.post('/api/token', response_model=token_schema.Token)
async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()], db: Annotated[AsyncSession, Depends(get_db)]):
    user = await user_cruds.authorize_user(db, form.username, form.password)
    if user is None:
        raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail='incorrect email or password'
        )
    token = create_access_token(
            { 'sub': user.email },
            timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
    )
    return token_schema.Token(access_token=token, token_type='bearer')

まずauthorize_userを実装する。これはapi/cruds/user.pyに書く。これは次の処理を行う。

  1. get_user_by_emailで、メールアドレスに合致するユーザを取得。
  2. パスワードを照合。
from sqlalchemy import select

async def get_user_by_email(
        db: AsyncSession,
        email: str
) -> user_model.User | None:
    result = await db.execute(select(user_model.User).where(user_model.User.email == email))
    row = result.first()

    if row is not None:
        return row[0]
    else:
        return None


async def authorize_user(
        db: AsyncSession,
        email: str,
        password: str,
) -> user_model.User | None:
    user = await get_user_by_email(db, email)

    if user is None:
        return None
    if not pwd_context.verify(password, user.password):
        return None

    return user

次にcreate_access_tokenを実装する。これはapi/routers/user.pyに書く。

  1. 付加的な情報dataと有効期限の情報expを結合した新しいdictを作成。
  2. JWTにエンコードして返す。
from jose import jwt

ALGORITHM = 'HS256'
SECRET_KEY ='secret'

def create_access_token(data: dict, expires_delta: timedelta):
    expire = datetime.utcnow() + expires_delta
    return jwt.encode({ **data, 'exp': expire }, SECRET_KEY, algorithm=ALGORITHM)

この時点でSwagger UIを開き、/api/tokenに登録したメールアドレス、パスワードを入力して送信すればトークンが返ってくることを確認する。

シークレットキーの分離

コード中にシークレットキーが入っているとセキュリティ的にまずい。

そこで、.envにシークレットキーの情報を書くことにする。コミットする際には、それをignoreする設定を書いておく。 そして、.envからデータを読みだすためにpydanticのBaseSettingsクラスを用いる。

まず、api/settings.pyを作成する。今回はSECRET_KEYのみ入れた設定クラスを作成する。.envを読み込んで作成するという情報もここに入れる。

from pydantic import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    secret_key: str

    class Config:
        env_file = ".env"


@lru_cache
def get_settings():
    return Settings()

つづいて、2つの関数を変更する。

from api.settings import get_settings, Settings


def create_access_token(data: dict, expires_delta: timedelta, secret_key: str):
    expire = datetime.utcnow() + expires_delta
    return jwt.encode({ **data, 'exp': expire }, secret_key, algorithm=ALGORITHM)


@router.post('/api/token', response_model=token_schema.Token)
async def login(
        form: Annotated[OAuth2PasswordRequestForm, Depends()],
        db: Annotated[AsyncSession, Depends(get_db)],
        settings: Annotated[str, Depends(get_settings)]
):
    # 中略
    token = create_access_token(
            { 'sub': user.email },
            timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
            secret_key=settings.secret_key
    )
    return token_schema.Token(access_token=token, token_type="bearer")

最後に、.envを作成する。SECRET_KEYには、コマンドopenssl rand -hex 32で出力したランダムな値を入れておく。

SECRET_KEY=<openssl rand -hex 32 コマンドの実行結果>

ログインしているユーザ情報が見られるAPIを作成

やることは、

  1. トークンを取得。
  2. トークンの有効期限をチェック。
  3. トークンのsub情報からユーザ情報をDBから検索し、返す。

の4つ。それぞれについての実装方法は、

  1. fastapi.securityモジュールでOAuth2PasswordBearerが提供されているのでそれを使う。
  2. joseモジュールのjwt.decodeでJWTをデコードする。このとき有効期限情報もチェックされる(参考ソースコード)。
  3. デコードされた辞書からsub情報を取り出す。ここにはemailの情報を入れたので、これをもとにget_user_by_emailを使ってユーザを取得する。

となる。

ひな形づくり

from api.models import user as user_model
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer


oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/api/token')


async def get_user(token: Annotated[str, Depends(oauth2_scheme)], db: Annotated[AsyncSession, Depends(get_db)]):
    pass


@router.get('/api/user/me', response_model=user_schema.User)
async def get_user_me(user: Annotated[user_model.User, Depends(get_user)]):
    return user

この時点でSwagger UIを開いて、/api/user/meのエンドポイントが開かれていればOK。

さらにUIの右上にAuthorizeが開かれ、ここからログインすることができる。 ログイン後、oauth2_schemeが指定されているパスに対しては勝手にAuthorization: Bearer <token>を付加して送信してくれる。

中身の作成

書くのはこれだけ。例外周りの処理で煩雑に見えるかもしれないが、ロジックはシンプル。

  1. トークンをでコードしてメールアドレスを取り出す。
  2. メールアドレスでユーザを検索して返す。
from jose import jwt, JWTError


async def get_user(
        token: Annotated[str, Depends(oauth2_scheme)],
        db: Annotated[AsyncSession, Depends(get_db)],
        settings: Annotated[Settings, Depends(get_settings)],
):
    credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail='could not validate credentials',
            headers={'WWW-Authenticate': 'Bearer'},
    )

    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
        email = payload.get('sub')
    except JWTError:
        raise credentials_exception

    user = await user_cruds.get_user_by_email(db, email)
    if user is None:
        raise credentials_exception

    return user

この時点でSwagger UIを開いて、Authorizeボタンからログインして、/api/user/meにGETリクエストを投げてみると、ちゃんとユーザ情報が帰ってくることが分かる。

ログインしているかどうかを判定するAPIを作成

やることは前節とほとんど変わらない。

今回は「ユーザ情報を取得出来たらログインできている」と判断し、それを真偽値で返すようなパスを作成してみる。そのために以下の工夫をする。

oauth2_scheme_noerr = OAuth2PasswordBearer(tokenUrl='/api/token', auto_error=False)


async def get_user_if_exists(
        token: Annotated[str, Depends(oauth2_scheme_noerr)],
        db: Annotated[AsyncSession, Depends(get_db)],
        settings: Annotated[Settings, Depends(get_settings)],
):
    if token is None:
        return token

    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
        email = payload.get('sub')
    except JWTError:
        return None

    user = await user_cruds.get_user_by_email(db, email)

    return user


@router.get('/api/is_logined')
async def is_logined(user: Annotated[user_model.User | None, Depends(get_user_if_exists)]):
    return { 'is_logined': user is not None }