Back-End/Fastapi

6. 역할에 따른 통제권한 (Role-Based Access Control)/Part01

sd4beatles 2024. 10. 16. 05:31

1 Introduction

대부분 모든 데이터베이스 시스템은 '통제권한'을 가지게 된다. Admin 사용자는 테이블 생성,삭제, 수정 및 모든 권한을 가지게 되며,그에 반면에 '일반 유저'는 시스템에서 할 수 있는 제한된 권한을 가지게 됩니다. 지금까지 우리가 생성했던,모든 프로젝트들은 이러한 권한제어 없이, 모든 이들에게 'full access'를 준 경우이며,이는 현실과 동떨어지게 되었다고 생각했습니다. 이러한 권한제어를 통제할 방법이 없을까 생각하던 도중에, 'medium'에서 쓸 블로그를 읽고 다시 내것으로 재 정리하는 코드를 올려봅니다.

(EN)

"Every database system must establish a clear distinction between admin users and regular users. Admins have full access to create and modify any table in the system, while regular users have restricted access. In this lesson, we'll discuss how to manage their roles by controlling access permissions."

2. Background

✍️ Note
아래 코드는 이전 blog의 _확장_이므로, 이미 기존의 코드를 안다고 가정하에 만들었습니다.처음 접하는 분을 위해서 간략하게 소개를 하고 setting으로 넘어가겠습니다.

  • AcessTokenBearer
    1) fastapi.sercurity에 제공하는 HTTPBearer의 class를 물려받음 class입니다.
    2) user가 보내준 token을 verification(증명)하는 클래스이며, 정상으로 판단되면 user_credential 정보를 dictionary 형태로 전달합니다.
  • get_session
    1) database session과 관련된 함수입니다.
    2) 데이터베이스 세션이란?
    "데이터베이스 세션은 데이터베이스와 상호작용하기 위한 임시 작업 공간입니다. 여러 작업(쿼리, 업데이트 등)을 하나의 트랜잭션으로 묶을
    수 있게 해줍니다"
  • get_user_by_conditions
    1) user_name 또는 email을 전달하면, 그에 상응하는 user를 찾아 전달해주는 함수입니다.
    2) arguments는 user_name,email,session을 전달해야 합니다.
    3) 더 많은 정보는 이전의 blog를 참고하시기 바랍니다.
  • User
    1) SQLModel 상속된 class로서, database의 table의 column 정보와 각 table들의 관계를 정의하고 있음

3. Implemtation

3.1 find current user

user의 record 정보를 찾을 수 있는 함수 하나를 정의합니다. 이 function은 argument에서 받은 token_detail의 정보와 일치하는 레코드를 찾아서 _User클래스*_의 객체형태로 전달하게 됩니다.

//src.auth.dependencies.py

async def find_current_user(token_details:dict=Depends(AccessTokenBearer()),
                     session:AsyncSession=Depends(get_session)):
  # token_deatils does not contain user_name
  user_email,user_name=token_details['email'],None

  #using get_user
  user=await auth_service.get_user_by_conditioins(user_name=user_name,email=user_email,session=session)

  return user

3.2 Role Checker

일반적인 클래스이면서, argument로 allowed_roles의 list를 받게 됩니다. 그리고, 만약 user의 정보중에 role 이 list에 속한다면, True값을 전달하게 되며, 그렇지않으면 403_FORBIDDEN error를 전달하게 됩니다. 코드는 아래와 같습니다.

# This class only checks if a user bleongs ot any of the defined roles. 
class RoleChecker:
  def __init__(self,allowed_roles:List[str]):
    self.allowed_roles=allowed_roles

  def __call__(self,current_user:User=Depends(find_current_user)) -> Any:
    if current_user.role in self.allowed_roles:
      return True

    raise HTTPException(
      status_code=status.HTTP_403_FORBIDDEN,
      detail="You are not permitted to perform this action"
    )

3.3 Modify Dependet Functions

3.3.1 User

이전에 sqlmodel을 통해서 만들어진, User model에 추가적으로 'role'이라는 column을 추가할 필요가 있습니다.
이는 아래와 같은데

/src/models.py

class User(SQLModel,table=True):
  __tablename__="users"

  uid:uuid.UUID=Field(
    sa_column=Column(
      pg.UUID,
      nullable=False,
      primary_key=True,
      default=uuid.uuid4
    ))
  user_name:str=Field(unique=True,nullable=False)
  email:str=Field(unique=True,nullable=False)
  first_name:str
  last_name:str
  passowrd_hash:str=Field(default=False)
  is_verified:bool=Field(default=False)
  updated_at:datetime=Field(sa_column=Column(pg.TIMESTAMP,default=datetime.now,nullable=False))

  # #add user's role to our database table (이 부분을 추가해줄 것)
  role:str=Field(sa_column=Column(pg.VARCHAR,nullable=False,server_default="user"))

  def __repr(self):
    return f"<User {self.user_name}>"

추가적으로 이러한 부분은 database table의 구조를 변경하는 것이므로, alembic을 통해서 control해줍니다.

alembic revision --autogenerate -m "add role colum to User class"
alembic upgrade head

3.3.2 AuthService.create_user function

auth/service.py에서 우리는 AuthService class를 정의내렸으며,이 class 함수 중에 create_user 함수에 추가적인 정보를 제공해준다.또한, create_access_token을 만드는 payload에 다가 user_role을 추가적으로 집어넣어준다.



class AuthService:

  async def create_user(self,account:UserInput,session:AsyncSession):
      user_dict=account.model_dump()

      #create a string of hex digits in standard form
      uid=uuid.uuid4()

      new_user=User(
        uid=uid,
        updated_at=datetime.now(),
        **user_dict
      )

      #hasing the confidential information
      new_user.passowrd_hash=hash(user_dict['password_hash'])
      #이 부분을 추가해줄 것. 
      new_user.role="user"
      session.add(new_user)

      await session.commit()
      return new_user


  async def create_bear_token(self,user_accnt:UserLogin,session:AsyncSession):
    - 생략 -
    access_token=create_access_token(
      user_data={
        'email':user.email,
        'user_uid':str(user.uid),
        #token 생성시 payload에 role도 집어넣어 줄것
        'user_role':user.role},
      expires_delta=expires_delta)

    refresh_token=create_access_token(
      user_data={
        'email':user.email,
        'user_uid':str(user.uid),
         #token 생성시 payload에 role도 집어넣어 줄것
        'user_role':user.role},
      refresh=True,
      expires_delta=expires_delta)

    return JSONResponse(
      content={
        "message":"Login Success",
        "access_token":access_token,
        "refresh_token":refresh_token,
        "token_type":"bearer",
        "user":{
          "email":user.email,
          "uid":str(user.uid)
        }})

3.3.3 Set a restriction to access database

아래와 같이 end-point를 지정해주는 파이썬 파일에, dependencies를 추가해줌으로써, admin 만 입장가능하게 해주거나 아니면 모두 허용해주는 추가적인 제한을 지정해줄 수 있습니다.


from src.auth.dependencies import RoleChecker

#admin만 허용하는 rolec_checker객체 
role_checker_admin=Depends(RoleChecker(allowed_roles=['admin']))

#admin user만 입장가능함 
@auth_router.get("/search/all",dependencies=[role_checker])
async def get_all(session:AsyncSession=Depends(get_session),
                  user_credential=Depends(token_bearer)):
  users=await auth_service.get_users(session)
  return users

#모두 입장 가능
@auth_router.post("/login")
async def login(user_credentials:UserLogin,session:AsyncSession=Depends(get_session)):
  msg=await auth_service.create_bear_token(user_credentials,session)
  return msg