FastAPI教程 使用密碼和 Bearer 的簡單 OAuth2

2021-11-03 10:12 更新

現(xiàn)在讓我們接著上一章繼續(xù)開發(fā),并添加缺少的部分以實(shí)現(xiàn)一個完整的安全性流程。

獲取 username 和 password

我們將使用 FastAPI 的安全性實(shí)用工具來獲取 username 和 password。

OAuth2 規(guī)定在使用(我們打算用的)「password 流程」時,客戶端/用戶必須將 username 和 password 字段作為表單數(shù)據(jù)發(fā)送。

而且規(guī)范明確了字段必須這樣命名。因此 user-name 或 email 是行不通的。

不過不用擔(dān)心,你可以在前端按照你的想法將它展示給最終用戶。

而且你的數(shù)據(jù)庫模型也可以使用你想用的任何其他名稱。

但是對于登錄路徑操作,我們需要使用這些名稱來與規(guī)范兼容(以具備例如使用集成的 API 文檔系統(tǒng)的能力)。

規(guī)范還寫明了 username 和 password 必須作為表單數(shù)據(jù)發(fā)送(因此,此處不能使用 JSON)。

scope

規(guī)范還提到客戶端可以發(fā)送另一個表單字段「scope」。

這個表單字段的名稱為 scope(單數(shù)形式),但實(shí)際上它是一個由空格分隔的「作用域」組成的長字符串。

每個「作用域」只是一個字符串(中間沒有空格)。

它們通常用于聲明特定的安全權(quán)限,例如:

  • users:read 或者 users:write 是常見的例子。
  • Facebook / Instagram 使用 instagram_basic。
  • Google 使用了 https://www.googleapis.com/auth/drive 。

Info

在 OAuth2 中「作用域」只是一個聲明所需特定權(quán)限的字符串。

它有沒有 : 這樣的其他字符或者是不是 URL 都沒有關(guān)系。

這些細(xì)節(jié)是具體的實(shí)現(xiàn)。

對 OAuth2 來說它們就只是字符串而已。

獲取 username 和 password 的代碼

現(xiàn)在,讓我們使用 FastAPI 提供的實(shí)用工具來處理此問題。

OAuth2PasswordRequestForm

首先,導(dǎo)入 OAuth2PasswordRequestForm,然后在 token 的路徑操作中通過 Depends 將其作為依賴項使用。

from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()


def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

OAuth2PasswordRequestForm 是一個類依賴項,聲明了如下的請求表單:

  • username。
  • password。
  • 一個可選的 scope 字段,是一個由空格分隔的字符串組成的大字符串。
  • 一個可選的 grant_type.

Tip

OAuth2 規(guī)范實(shí)際上要求 grant_type 字段使用一個固定的值 password,但是 OAuth2PasswordRequestForm 沒有作強(qiáng)制約束。

如果你需要強(qiáng)制要求這一點(diǎn),請使用 OAuth2PasswordRequestFormStrict 而不是 OAuth2PasswordRequestForm。

  • 一個可選的 client_id(我們的示例不需要它)。
  • 一個可選的 client_secret(我們的示例不需要它)。

Info

OAuth2PasswordRequestForm 并不像 OAuth2PasswordBearer 一樣是 FastAPI 的一個特殊的類。

OAuth2PasswordBearer 使得 FastAPI 明白它是一個安全方案。所以它得以通過這種方式添加到 OpenAPI 中。

但 OAuth2PasswordRequestForm 只是一個你可以自己編寫的類依賴項,或者你也可以直接聲明 Form 參數(shù)。

但是由于這是一種常見的使用場景,因此 FastAPI 出于簡便直接提供了它。

使用表單數(shù)據(jù)

Tip

類依賴項 OAuth2PasswordRequestForm 的實(shí)例不會有用空格分隔的長字符串屬性 scope,而是具有一個 scopes 屬性,該屬性將包含實(shí)際被發(fā)送的每個作用域字符串組成的列表。

在此示例中我們沒有使用 scopes,但如果你需要的話可以使用該功能。

現(xiàn)在,使用表單字段中的 username 從(偽)數(shù)據(jù)庫中獲取用戶數(shù)據(jù)。

如果沒有這個用戶,我們將返回一個錯誤消息,提示「用戶名或密碼錯誤」。

對于這個錯誤,我們使用 HTTPException 異常:

from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()


def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

校驗密碼

目前我們已經(jīng)從數(shù)據(jù)庫中獲取了用戶數(shù)據(jù),但尚未校驗密碼。

讓我們首先將這些數(shù)據(jù)放入 Pydantic UserInDB 模型中。

永遠(yuǎn)不要保存明文密碼,因此,我們將使用(偽)哈希密碼系統(tǒng)。

如果密碼不匹配,我們將返回同一個錯誤。

哈希密碼

「哈?!沟囊馑际牵簩⒛承﹥?nèi)容(在本例中為密碼)轉(zhuǎn)換為看起來像亂碼的字節(jié)序列(只是一個字符串)。

每次你傳入完全相同的內(nèi)容(完全相同的密碼)時,你都會得到完全相同的亂碼。

但是你不能從亂碼轉(zhuǎn)換回密碼。

為什么使用哈希密碼

如果你的數(shù)據(jù)庫被盜,小偷將無法獲得用戶的明文密碼,只有哈希值。

因此,小偷將無法嘗試在另一個系統(tǒng)中使用這些相同的密碼(由于許多用戶在任何地方都使用相同的密碼,因此這很危險)。

from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()


def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

關(guān)于 **user_dict

UserInDB(**user_dict) 表示:

直接將 user_dict 的鍵和值作為關(guān)鍵字參數(shù)傳遞,等同于:

UserInDB(
    username = user_dict["username"],
    email = user_dict["email"],
    full_name = user_dict["full_name"],
    disabled = user_dict["disabled"],
    hashed_password = user_dict["hashed_password"],
)

Info

有關(guān) user_dict 的更完整說明,請參閱額外的模型文檔

返回令牌

token 端點(diǎn)的響應(yīng)必須是一個 JSON 對象。

它應(yīng)該有一個 token_type。在我們的例子中,由于我們使用的是「Bearer」令牌,因此令牌類型應(yīng)為「bearer」。

并且還應(yīng)該有一個 access_token 字段,它是一個包含我們的訪問令牌的字符串。

對于這個簡單的示例,我們將極其不安全地返回相同的 username 作為令牌。

Tip

在下一章中,你將看到一個真實(shí)的安全實(shí)現(xiàn),使用了哈希密碼和 JWT 令牌。

但現(xiàn)在,讓我們僅關(guān)注我們需要的特定細(xì)節(jié)。

from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()


def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

Tip

根據(jù)規(guī)范,你應(yīng)該像本示例一樣,返回一個帶有 access_token 和 token_type 的 JSON。

這是你必須在代碼中自行完成的工作,并且要確保使用了這些 JSON 字段。

這幾乎是唯一的你需要自己記住并正確地執(zhí)行以符合規(guī)范的事情。

其余的,F(xiàn)astAPI 都會為你處理。

更新依賴項

現(xiàn)在我們將更新我們的依賴項。

我們想要僅當(dāng)此用戶處于啟用狀態(tài)時才能獲取 current_user。

因此,我們創(chuàng)建了一個額外的依賴項 get_current_active_user,而該依賴項又以 get_current_user 作為依賴項。

如果用戶不存在或處于未啟用狀態(tài),則這兩個依賴項都將僅返回 HTTP 錯誤。

因此,在我們的端點(diǎn)中,只有當(dāng)用戶存在,身份認(rèn)證通過且處于啟用狀態(tài)時,我們才能獲得該用戶:

from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()


def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

Info

我們在此處返回的值為 Bearer 的額外響應(yīng)頭 WWW-Authenticate 也是規(guī)范的一部分。

任何的 401「未認(rèn)證」HTTP(錯誤)狀態(tài)碼都應(yīng)該返回 WWW-Authenticate 響應(yīng)頭。

對于 bearer 令牌(我們的例子),該響應(yīng)頭的值應(yīng)為 Bearer。

實(shí)際上你可以忽略這個額外的響應(yīng)頭,不會有什么問題。

但此處提供了它以符合規(guī)范。

而且,(現(xiàn)在或?qū)恚┛赡軙泄ぞ咂谕玫讲⑹褂盟?,然后對你或你的用戶有用處?/p>

這就是遵循標(biāo)準(zhǔn)的好處...

實(shí)際效果

打開交互式文檔:http://127.0.0.1:8000/docs。

身份認(rèn)證

點(diǎn)擊「Authorize」按鈕。

使用以下憑證:

用戶名:johndoe

密碼:secret

在系統(tǒng)中進(jìn)行身份認(rèn)證后,你將看到:

獲取本人的用戶數(shù)據(jù)

現(xiàn)在執(zhí)行 /users/me 路徑的 GET 操作。

你將獲得你的用戶數(shù)據(jù),如:

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false,
  "hashed_password": "fakehashedsecret"
}

如果你點(diǎn)擊鎖定圖標(biāo)并注銷,然后再次嘗試同一操作,則會得到 HTTP 401 錯誤:

{
  "detail": "Not authenticated"
}

未啟用的用戶

現(xiàn)在嘗試使用未啟用的用戶,并通過以下方式進(jìn)行身份認(rèn)證:

用戶名:alice

密碼:secret2

然后嘗試執(zhí)行 /users/me 路徑的 GET 操作。

你將得到一個「未啟用的用戶」錯誤,如:

{
  "detail": "Inactive user"
}

總結(jié)

現(xiàn)在你掌握了為你的 API 實(shí)現(xiàn)一個基于 username 和 password 的完整安全系統(tǒng)的工具。

使用這些工具,你可以使安全系統(tǒng)與任何數(shù)據(jù)庫以及任何用戶或數(shù)據(jù)模型兼容。

唯一缺少的細(xì)節(jié)是它實(shí)際上還并不「安全」。

在下一章中,你將看到如何使用一個安全的哈希密碼庫和 JWT 令牌。


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號