This is the second of a two part series on implementing authorization in a FastAPI application using Deta. In the previous article, we learned a bit about JWT, set up the project, and finished the building blocks of authorization logic. In this article, let's implement the logic, and deploy our app on Deta micros! The full code is available here.
Implementing the auth logic
Before we implement the auth logic, let's create a data model for login and signup details.
In user_model.py
:
from pydantic import BaseModel
class AuthModel(BaseModel):
username: str
password: str
This model represents the data that we can expect from the client when they hit /login
or /signup
endpoints.
Update the main.py
, with the following import statements
from auth import Auth
from user_model import AuthModel
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
Also, we will create an auth_handler
to access the logic from the Auth
class. We will use security
in our protected endpoints to access the token from the request header.
security = HTTPBearer()
auth_handler = Auth()
Here is the code for /signup
endpoint.
@app.post('/signup')
def signup(user_details: AuthModel):
if users_db.get(user_details.username) != None:
return 'Account already exists'
try:
hashed_password = auth_handler.encode_password(user_details.password)
user = {'key': user_details.username, 'password': hashed_password}
return users_db.put(user)
except:
error_msg = 'Failed to signup user'
return error_msg
In this function we are checking if a user with the username already exists in our users_db
. If so, we can simply return a message indicating that the account already exists. If the user doesn't already exists we can hash the password using the encode_password
function from auth.py
and store the user in users_db
. In case of any errors while adding the user to the base, we can return a failure message.
/login
endpoint is pretty simple. This also takes in the argument user_details
, which has the username and password.
@app.post('/login')
def login(user_details: AuthModel):
user = users_db.get(user_details.username)
if (user is None):
return HTTPException(status_code=401, detail='Invalid username')
if (not auth_handler.verify_password(user_details.password, user['password'])):
return HTTPException(status_code=401, detail='Invalid password')
access_token = auth_handler.encode_token(user['key'])
refresh_token = auth_handler.encode_refresh_token(user['key'])
return {'access_token': access_token, 'refresh_token': refresh_token}
If the account with the username doesn't exist, or if the hashed password in the users_db
doesn't match the input password we can simply raise an HTTPException
. Otherwise, we can return access_token
and refresh_token
.
@app.post('/secret')
def secret_data(credentials: HTTPAuthorizationCredentials = Security(security)):
token = credentials.credentials
if(auth_handler.decode_token(token)):
return 'Top Secret data only authorized users can access this info'
@app.get('/notsecret')
def not_secret_data():
return 'Not secret data'
The /secret
endpoint only returns the "Secret Data" if the token argument is valid. However, if the token is invalid or an expired token, then decode_token
raises a HTTPException
. The token is usually passed in the request header as Authorization: Bearer <token>
. Therefore, to get the token we can wrap the input credentials
around HTTPAuthorizationCredentials
tag. Now we can access the token from the request header in credentials.credentials
.
The /not_secret
endpoint is an example of an unprotected endpoint, which doesn't require any authentication.
@app.get('/refresh_token')
def refresh_token(credentials: HTTPAuthorizationCredentials = Security(security)):
refresh_token = credentials.credentials
new_token = auth_handler.refresh_token(refresh_token)
return {'access_token': new_token}
/refresh_token
endpoint is also pretty simple, it receives a refresh token which is then passed onto the the function from auth logic to get a new token.
Here is a look at main.py
at the end:
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from auth import Auth
from deta import Deta
from user_model import AuthModel
deta = Deta()
users_db = deta.Base('users')
app = FastAPI()
security = HTTPBearer()
auth_handler = Auth()
@app.post('/signup')
def signup(user_details: AuthModel):
if users_db.get(user_details.username) != None:
return 'Account already exists'
try:
hashed_password = auth_handler.encode_password(user_details.password)
user = {'key': user_details.username, 'password': hashed_password}
return users_db.put(user)
except:
error_msg = 'Failed to signup user'
return error_msg
@app.post('/login')
def login(user_details: AuthModel):
user = users_db.get(user_details.username)
if (user is None):
return HTTPException(status_code=401, detail='Invalid username')
if (not auth_handler.verify_password(user_details.password, user['password'])):
return HTTPException(status_code=401, detail='Invalid password')
access_token = auth_handler.encode_token(user['key'])
refresh_token = auth_handler.encode_refresh_token(user['key'])
return {'access_token': access_token, 'refresh_token': refresh_token}
@app.get('/refresh_token')
def refresh_token(credentials: HTTPAuthorizationCredentials = Security(security)):
refresh_token = credentials.credentials
new_token = auth_handler.refresh_token(refresh_token)
return {'access_token': new_token}
@app.post('/secret')
def secret_data(credentials: HTTPAuthorizationCredentials = Security(security)):
token = credentials.credentials
if(auth_handler.decode_token(token)):
return 'Top Secret data only authorized users can access this info'
@app.get('/notsecret')
def not_secret_data():
return 'Not secret data'
To test the app, go to the terminal in the same directory and run uvicorn main:app
, you can then go /docs
on the local endpoint (for me it was http://127.0.0.1:8000/docs
) to test the application.
/signup
curl -X 'POST' \
'http://127.0.0.1:8000/signup' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"username": "flyingsponge",
"password": "SpongePassword"
}'
Response Body
{
"key": "flyingsponge",
"password": "$2b$12$Ml66gTN0j4sAOkoDJ1aKnOmP0ye1FBNNk1QzEt0/6LgemgOUj469e"
}
/login
curl -X 'POST' \
'http://127.0.0.1:8000/login' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"username": "flyingsponge",
"password": "SpongePassword"
}'
Response Body
{
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTkwMTY4MzgsImlhdCI6MTYxOTAxNTAzOCwic3ViIjoiZmx5aW5nc3BvbmdlIn0.XnPaDwmj30M3vMOaPUOrqESIBNx0mctbjNW5jY8hIjQ",
"refresh_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTkwNTEwMzgsImlhdCI6MTYxOTAxNTAzOCwic3ViIjoiZmx5aW5nc3BvbmdlIn0.Pm68HQFM6NwUxnEjFUxxTiOYT3KBqchD_e2g0LfP5Co"
}
/secret
curl -X 'POST' \
'http://127.0.0.1:8000/secret' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTkwMTY2MTIsImlhdCI6MTYxOTAxNDgxMiwic3ViIjoicmFta2kifQ.U8fUN1x1Gz18f7oA_y7Z9DE-1FIH9Ps0sDilwORvmI8' \
-d ''
Response body
"Top Secret data only authorized users can access this info"
/notsecret
curl -X 'GET' \
'http://127.0.0.1:8000/notsecret' \
-H 'accept: application/json'
Response Body
"Not secret data"
/secret
curl -X 'POST' \
'http://127.0.0.1:8000/secret' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTg5MzU3MzcsImlhdCI6MTYxODkzNTY3Nywic3ViIjoicm9oYW4ifQ.dja0E6SUaZfEvYVKySjLE9OLXOtob5pjpy3R_rlCD7c' \
-d ''
Response Body
{
"detail": "Token expired"
}
Now that the token is expired, let's get a new one
/refresh_token
curl -X 'GET' \
'http://127.0.0.1:8000/refresh_token' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTkwNTA3NzYsImlhdCI6MTYxOTAxNDc3Niwic3ViIjoicmFta2kifQ.2J0O4RKwEcABxe6hEX7ZshMo66J6D0dD6-hYeFLBFGg'
Response Body
{
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MTkwMTY2MTIsImlhdCI6MTYxOTAxNDgxMiwic3ViIjoicmFta2kifQ.U8fUN1x1Gz18f7oA_y7Z9DE-1FIH9Ps0sDilwORvmI8"
}
Deploy on Deta micros
Before we begin, make sure to install the Deta CLI. After installing, run the following commands in the same directory to deploy our app on Deta micros.
deta login
We also need to add a .env
file with the secret.
APP_SECRET_STRING=SECRET_STRING
Now we need to update our micro by doing:
deta new
deta update -e .env
deta deploy
deta visor disable
Summary
Our simple FastAPI application with JWT auth is now ready! As you can probably tell, we are not doing anything "secret" with our authorization. This article is just a template for implementing authorization. You can build on this template to build a fullstack application that relies on authorization. The full code is available here.
Top comments (9)
hey thanks for the article..
after i deploy on Deta, and get my access token, and send request to /secret , it says "Not authenticated" !
i cloned your code, add my project-secret-key,deploy it , but it doesn't work.. (the api works, auth part not working)
what should i do?
Can you run
deta visor disable
and see if that works?yeah man, it works..
thanks . :D
Great tutorial. Thank you. I just have one issue: I cloned your code and ran the app. I get an error:
File "./main.py", line 36, in login
access_token = auth_handler.encode_token(user['key'])
"TypeError: Expected a string value"
Nevermind, this was because I did not set the APP_SECRET_STRING value.
Nice explanation and work done . keep it up!
if refresh token expired need relogined? It`s bad.
Refresh token must be disposable.
After you call refresh_token function....that new token is for replace refresh_token or the access token ?...I'm a bit loss with the token flow..
Hey, yeah it is for the new access token. Feel free to dm me on twitter @spongedungeon if you have any questions.