Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

FastAPIの標準機能+αで作るアクセスコントロール

初めまして。技術本部Digitization部データ化グループでエンジニアをしている池田力です。

FastAPIでアクセスコントロールの仕組みを実装しようとしたところ、インターネット上に情報が少なく、ちょうど要件に合ったライブラリもなかったため、0から設計することになりました。今回はその中からエッセンスを抽出し、デモアプリケーションを作成する手順を紹介します。

背景

Sansanにはオペレーションセンターという部署があり、そこで名刺や請求書のデータ化・スキャンの業務を行っています。オペレータが作業する画面を作る際に最も重要な要素の一つが権限の制御です。オペレーションセンターが扱うのはお客様から預かった委託データであり、オペレータが作業に関連しない情報を見れないようアクセスコントロールを行わなければなりません。

一方で、オペレーションセンターにおいて管理業務も大きな課題となります。どんなに厳格にアクセスコントロールできたとしても作業者のアクセス付与に時間がかかってしまうと、効率的な運営が行えなくなってしまいます。

そのため、「セキュリティと利便性を両立させる」*1ことを意識して設計と実装を行う必要がありました。

最終的な構成

データモデルはユーザー・グループ・ロールの3階層に分ける形にしました。ユーザーとグループは多対1対応、グループとロールは多対多対応、ロールとURLのパスで多対多の対応となっています。このような階層に分けることで過不足なくアクセスコントロールを実現し、新規ユーザー追加時の権限設定の手間、新規ロール追加時のユーザー設定の手間を省くことができます。

User *- Group *-* Role *-* URL Path

ソースコードの全体像を先に見たい方は こちら をご覧ください。

個別の処理

それぞれの階層の紐付け

今回は説明を簡素にするためにPython上でハードコードしたもので説明します。実際に実装する際は適宜データベースを用意してください。

class Group(str, Enum):
    admin = "admin"
    userGroup1 = "user-group-1"
    userGroup2 = "user-group-2"


class Role(str, Enum):
    admin = "admin"
    common = "common"
    items = "items"


userGroupMapping: dict[str, Group] = {
    "admin": Group.admin,
    "user-1": Group.userGroup1,
    "user-2": Group.userGroup1,
    "user-3": Group.userGroup2,
}

groupRoleMapping: dict[Group, list[Role]] = {
    Group.admin: [*Role],
    Group.userGroup1: [Role.common, Role.items],
    Group.userGroup2: [Role.common],
}

ログイン周りの処理

認証情報を保持するクラスとして AuthInfo を定義します。

ユーザーIDとパスワードの検証を行った後、上記で定義した userGroupMapping でグループを取得し、 groupRoleMapping からロールのリストを取得しています。

class AuthInfo(BaseModel):
    userId: str
    roles: list[Role]
    exp: datetime

    def token(self) -> str:
        return jwt.encode(self.dict(), SECRET, algorithm="HS256")

    @classmethod
    def login(cls, response: Response, userId: str, password: str) -> "AuthInfo":
        if userId not in userGroupMapping:
            raise HTTPException(status_code=403)

        # 説明用のアプリケーションのため省略
        if password == "":
            raise HTTPException(status_code=403)

        authInfo = AuthInfo(
            userId=userId,
            roles=groupRoleMapping[userGroupMapping[userId]],
            exp=datetime.now(tz=timezone.utc) + SESSION_TTL,
        )

        response.set_cookie(
            key=COOKIE_SESSION_KEY,
            value=authInfo.token(),
            secure=True,
            httponly=True,
            samesite="lax",
        )

        return authInfo

認証された情報をJWTによってトークン化し、Cookieのsessionにセットし、レスポンスを返します。 JWTのペイロードには以下のようにuserIdとrolesの値が入ります。

{
    "roles": [
        "admin",
        "common",
        "items"
    ],
    "userId": "admin"
}

アクセス権限の付与

アクセス権限の付与にはAccessControlクラスを定義し、FastAPIのdependencyに引き渡します。

class AuthInfo(BaseModel):
    @classmethod
    def decode(cls, session: Optional[str]) -> "AuthInfo":
        if session is None:
            raise AuthenticationError("トークンが無効です")

        try:
            data = jwt.decode(session, SECRET, algorithms=["HS256"])
            authInfo = AuthInfo(
                roles=data["roles"], userId=data["userId"], exp=data["exp"]
            )
        except (
            pydantic.error_wrappers.ValidationError,
            KeyError,
            jwt.exceptions.InvalidSignatureError,
            jwt.exceptions.ExpiredSignatureError,
        ) as e:
            raise AuthenticationError("トークンが無効です") from e

        return authInfo


@dataclass(frozen=True, eq=True)
class AccessControl:
    permit: set[Role]

    def __call__(
        self,
        session: Optional[str] = Cookie(default=None, alias="session"),
    ) -> AuthInfo:
        try:
            authInfo = AuthInfo.decode(session)
        except AuthenticationError as e:
            raise HTTPException(status_code=403) from e

        if not self.has_compatible_role(authInfo):
            raise HTTPException(status_code=403)

        return authInfo

    def has_compatible_role(self, requesterAuthInfo: AuthInfo) -> bool:
        requesterRoles = set(requesterAuthInfo.roles)
        return len(self.permit.intersection(requesterRoles)) > 0

__call__ メソッドの session 引数には、ログイン時に Set-Cookie したJWTトークンが渡されます。JWTトークンをデコードし、ユーザーが持っているロールが authInfo として復元されます。

ルーティングのコードでは、AccessControlクラスに許可するロールの一覧をsetで渡してあげます。

@app.get(
    "/items/", dependencies=[
        Depends(AccessControl(permit={Role.items, Role.admin}))
    ]
)
def getItems():
    ...


@app.delete(
    "/items/", dependencies=[
        Depends(AccessControl(permit={Role.admin}))
    ]
)
def deleteItem():
    ...

ここまで実装すれば、 GET /items/ にアクセスしたユーザーが、 itemsadmin のロールを持っていればそのまま処理が続行されてデータを返却し、持っていなければ403 Forbiddenのレスポンスが返却されるようになります。

# 権限があるユーザーの場合200系のレスポンスが返ってくること
$ http --print=h --session=test-admin POST localhost:8000/auth/ userId=admin password=password
HTTP/1.1 200 OK
content-length: 4
content-type: application/json
date: Sun, 28 Aug 2022 15:11:48 GMT
server: uvicorn
set-cookie: session=eyJ0eXAi...; HttpOnly; Path=/; SameSite=lax; Secure

$ http --print=h --session=test-admin GET localhost:8000/items/ | head -n1
HTTP/1.1 200 OK

# 権限のないユーザーの場合403のレスポンスが返ってくること
$ http --print=h --session=test-user-3 POST localhost:8000/auth/ userId=user-3 password=password
HTTP/1.1 200 OK
content-length: 4
content-type: application/json
date: Sun, 28 Aug 2022 15:12:11 GMT
server: uvicorn
set-cookie: session=eyJ0eXAi...; HttpOnly; Path=/; SameSite=lax; Secure

$ http --print=h --session=test-user-3 GET localhost:8000/items/ | head -n1
HTTP/1.1 403 Forbidden

自動テスト(Pytest)

最後にテストでどのようなコードを書けば実現できるかについて紹介します。 冒頭で申し上げた通り、権限がある人にだけ必要な情報が見えるようにする必要があるため、テストで権限を確認することは大切です。

FastAPIのdependencyを使うことでテストがしやすくなりました。 具体的には以下のようなコードを記述することで、テスト時に任意のユーザー・ロールを再現することができます。

import pytest
from fastapi import HTTPException
from fastapi.testclient import TestClient
from more_itertools import powerset

from auth import AccessControl, AuthInfo, Role
from main import app


@pytest.fixture
def test_client() -> TestClient:
    return TestClient(app)


@pytest.fixture(params=["user-1"])
def override_access_control_user(request):
    return request.param


@pytest.fixture
def override_access_control(role_patterns, override_access_control_user):
    def mock_access_control(access_control):
        def _mock_access_control():
            accessControlData = AuthInfo(
                roles=role_patterns, userId=override_access_control_user
            )
            if access_control.has_compatible_role(accessControlData):
                return accessControlData
            else:
                raise HTTPException(status_code=403)

        return _mock_access_control

    for mock_target_roles in powerset([*Role]):
        app.dependency_overrides[
            AccessControl(permit=set(mock_target_roles))
        ] = mock_access_control(AccessControl(permit=set(mock_target_roles)))

    yield

    for mock_target_roles in powerset([*Role]):
        del app.dependency_overrides[AccessControl(permit=set(mock_target_roles))]

override_access_control のfixtureを使い、 アクセス者のロールを role_patterns に入れてあげれば、権限ごとにAPIのアクセス権限があるかどうかのテスト実現できます。

import pytest
from fastapi.testclient import TestClient

from auth import Role

@pytest.mark.usefixtures("override_access_control")
@pytest.mark.parametrize(
    "role_patterns",
    [
        {Role.admin},
    ],
)
def test_admin(test_client: TestClient):
    response = test_client.get("/items/")
    assert response.status_code == 200


@pytest.mark.usefixtures("override_access_control")
@pytest.mark.parametrize(
    "role_patterns",
    [
        {Role.common},
    ],
)
def test_forbidden(test_client: TestClient):
    response = test_client.get("/items/")
    assert response.status_code == 403

まとめ

  • アクセスコントロールのデータモデルはユーザー・グループ・ロールの3階層に分けると柔軟なアクセスコントロールの仕組みを作れる
  • FastAPIのdependencyを活用すると読みやすく、テストしやすいコードになる

今回用いたコードは https://github.com/Tsutomu-Ikeda/fastapi-access-control-example においているのでぜひ参考にしてみてください。

終わりに

技術本部Digitization部データ化グループでは一緒に働く仲間を募集しています。機械と人力を組み合わせた精度の高いデータ化に興味がある方はぜひ採用ページをご覧ください。

hrmos.co

hrmos.co

*1:企業理念である「Sansanのカタチ」のPremiseとして掲げ、日々向き合っています。https://jp.corp-sansan.com/company/philosophy/

© Sansan, Inc.