初めまして。技術本部Digitization部データ化グループでエンジニアをしている池田力です。
FastAPIでアクセスコントロールの仕組みを実装しようとしたところ、インターネット上に情報が少なく、ちょうど要件に合ったライブラリもなかったため、0から設計することになりました。今回はその中からエッセンスを抽出し、デモアプリケーションを作成する手順を紹介します。
背景
Sansanにはオペレーションセンターという部署があり、そこで名刺や請求書のデータ化・スキャンの業務を行っています。オペレータが作業する画面を作る際に最も重要な要素の一つが権限の制御です。オペレーションセンターが扱うのはお客様から預かった委託データであり、オペレータが作業に関連しない情報を見れないようアクセスコントロールを行わなければなりません。
一方で、オペレーションセンターにおいて管理業務も大きな課題となります。どんなに厳格にアクセスコントロールできたとしても作業者のアクセス付与に時間がかかってしまうと、効率的な運営が行えなくなってしまいます。
そのため、「セキュリティと利便性を両立させる」*1ことを意識して設計と実装を行う必要がありました。
最終的な構成
データモデルはユーザー・グループ・ロールの3階層に分ける形にしました。ユーザーとグループは多対1対応、グループとロールは多対多対応、ロールとURLのパスで多対多の対応となっています。このような階層に分けることで過不足なくアクセスコントロールを実現し、新規ユーザー追加時の権限設定の手間、新規ロール追加時のユーザー設定の手間を省くことができます。
User *- Group *-* Role *-* URL Path
ソースコードの全体像を先に見たい方は こちら をご覧ください。
個別の処理
それぞれの階層の紐付け
今回は説明を簡素にするためにPython上でハードコードしたもので説明します。実際に実装する際は適宜データベースを用意してください。
class Group(str, Enum): admin = "admin" userGroup1 = "user-group-1" userGroup2 = "user-group-2" class Role(str, Enum): admin = "admin" common = "common" items = "items" userGroupMapping: dict[str, Group] = { "admin": Group.admin, "user-1": Group.userGroup1, "user-2": Group.userGroup1, "user-3": Group.userGroup2, } groupRoleMapping: dict[Group, list[Role]] = { Group.admin: [*Role], Group.userGroup1: [Role.common, Role.items], Group.userGroup2: [Role.common], }
ログイン周りの処理
認証情報を保持するクラスとして AuthInfo
を定義します。
ユーザーIDとパスワードの検証を行った後、上記で定義した userGroupMapping
でグループを取得し、 groupRoleMapping
からロールのリストを取得しています。
class AuthInfo(BaseModel): userId: str roles: list[Role] exp: datetime def token(self) -> str: return jwt.encode(self.dict(), SECRET, algorithm="HS256") @classmethod def login(cls, response: Response, userId: str, password: str) -> "AuthInfo": if userId not in userGroupMapping: raise HTTPException(status_code=403) # 説明用のアプリケーションのため省略 if password == "": raise HTTPException(status_code=403) authInfo = AuthInfo( userId=userId, roles=groupRoleMapping[userGroupMapping[userId]], exp=datetime.now(tz=timezone.utc) + SESSION_TTL, ) response.set_cookie( key=COOKIE_SESSION_KEY, value=authInfo.token(), secure=True, httponly=True, samesite="lax", ) return authInfo
認証された情報をJWTによってトークン化し、Cookieのsessionにセットし、レスポンスを返します。 JWTのペイロードには以下のようにuserIdとrolesの値が入ります。
{ "roles": [ "admin", "common", "items" ], "userId": "admin" }
アクセス権限の付与
アクセス権限の付与にはAccessControlクラスを定義し、FastAPIのdependencyに引き渡します。
class AuthInfo(BaseModel): @classmethod def decode(cls, session: Optional[str]) -> "AuthInfo": if session is None: raise AuthenticationError("トークンが無効です") try: data = jwt.decode(session, SECRET, algorithms=["HS256"]) authInfo = AuthInfo( roles=data["roles"], userId=data["userId"], exp=data["exp"] ) except ( pydantic.error_wrappers.ValidationError, KeyError, jwt.exceptions.InvalidSignatureError, jwt.exceptions.ExpiredSignatureError, ) as e: raise AuthenticationError("トークンが無効です") from e return authInfo @dataclass(frozen=True, eq=True) class AccessControl: permit: set[Role] def __call__( self, session: Optional[str] = Cookie(default=None, alias="session"), ) -> AuthInfo: try: authInfo = AuthInfo.decode(session) except AuthenticationError as e: raise HTTPException(status_code=403) from e if not self.has_compatible_role(authInfo): raise HTTPException(status_code=403) return authInfo def has_compatible_role(self, requesterAuthInfo: AuthInfo) -> bool: requesterRoles = set(requesterAuthInfo.roles) return len(self.permit.intersection(requesterRoles)) > 0
__call__
メソッドの session
引数には、ログイン時に Set-Cookie
したJWTトークンが渡されます。JWTトークンをデコードし、ユーザーが持っているロールが authInfo
として復元されます。
ルーティングのコードでは、AccessControlクラスに許可するロールの一覧をsetで渡してあげます。
@app.get( "/items/", dependencies=[ Depends(AccessControl(permit={Role.items, Role.admin})) ] ) def getItems(): ... @app.delete( "/items/", dependencies=[ Depends(AccessControl(permit={Role.admin})) ] ) def deleteItem(): ...
ここまで実装すれば、 GET
/items/
にアクセスしたユーザーが、 items
か admin
のロールを持っていればそのまま処理が続行されてデータを返却し、持っていなければ403 Forbiddenのレスポンスが返却されるようになります。
# 権限があるユーザーの場合200系のレスポンスが返ってくること $ http --print=h --session=test-admin POST localhost:8000/auth/ userId=admin password=password HTTP/1.1 200 OK content-length: 4 content-type: application/json date: Sun, 28 Aug 2022 15:11:48 GMT server: uvicorn set-cookie: session=eyJ0eXAi...; HttpOnly; Path=/; SameSite=lax; Secure $ http --print=h --session=test-admin GET localhost:8000/items/ | head -n1 HTTP/1.1 200 OK # 権限のないユーザーの場合403のレスポンスが返ってくること $ http --print=h --session=test-user-3 POST localhost:8000/auth/ userId=user-3 password=password HTTP/1.1 200 OK content-length: 4 content-type: application/json date: Sun, 28 Aug 2022 15:12:11 GMT server: uvicorn set-cookie: session=eyJ0eXAi...; HttpOnly; Path=/; SameSite=lax; Secure $ http --print=h --session=test-user-3 GET localhost:8000/items/ | head -n1 HTTP/1.1 403 Forbidden
自動テスト(Pytest)
最後にテストでどのようなコードを書けば実現できるかについて紹介します。 冒頭で申し上げた通り、権限がある人にだけ必要な情報が見えるようにする必要があるため、テストで権限を確認することは大切です。
FastAPIのdependencyを使うことでテストがしやすくなりました。 具体的には以下のようなコードを記述することで、テスト時に任意のユーザー・ロールを再現することができます。
import pytest from fastapi import HTTPException from fastapi.testclient import TestClient from more_itertools import powerset from auth import AccessControl, AuthInfo, Role from main import app @pytest.fixture def test_client() -> TestClient: return TestClient(app) @pytest.fixture(params=["user-1"]) def override_access_control_user(request): return request.param @pytest.fixture def override_access_control(role_patterns, override_access_control_user): def mock_access_control(access_control): def _mock_access_control(): accessControlData = AuthInfo( roles=role_patterns, userId=override_access_control_user ) if access_control.has_compatible_role(accessControlData): return accessControlData else: raise HTTPException(status_code=403) return _mock_access_control for mock_target_roles in powerset([*Role]): app.dependency_overrides[ AccessControl(permit=set(mock_target_roles)) ] = mock_access_control(AccessControl(permit=set(mock_target_roles))) yield for mock_target_roles in powerset([*Role]): del app.dependency_overrides[AccessControl(permit=set(mock_target_roles))]
override_access_control
のfixtureを使い、 アクセス者のロールを role_patterns
に入れてあげれば、権限ごとにAPIのアクセス権限があるかどうかのテスト実現できます。
import pytest from fastapi.testclient import TestClient from auth import Role @pytest.mark.usefixtures("override_access_control") @pytest.mark.parametrize( "role_patterns", [ {Role.admin}, ], ) def test_admin(test_client: TestClient): response = test_client.get("/items/") assert response.status_code == 200 @pytest.mark.usefixtures("override_access_control") @pytest.mark.parametrize( "role_patterns", [ {Role.common}, ], ) def test_forbidden(test_client: TestClient): response = test_client.get("/items/") assert response.status_code == 403
まとめ
- アクセスコントロールのデータモデルはユーザー・グループ・ロールの3階層に分けると柔軟なアクセスコントロールの仕組みを作れる
- FastAPIのdependencyを活用すると読みやすく、テストしやすいコードになる
今回用いたコードは https://github.com/Tsutomu-Ikeda/fastapi-access-control-example においているのでぜひ参考にしてみてください。
終わりに
技術本部Digitization部データ化グループでは一緒に働く仲間を募集しています。機械と人力を組み合わせた精度の高いデータ化に興味がある方はぜひ採用ページをご覧ください。
*1:企業理念である「Sansanのカタチ」のPremiseとして掲げ、日々向き合っています。https://jp.corp-sansan.com/company/philosophy/