FastAPI is essentially an API wrapper for Starlette. To fully grasp FastAPI, one must first understand Starlette.
1. ASGI Protocol
Uvicorn interacts with ASGI applications through a common interface. An application can send and receive information via Uvicorn by implementing the following code:
async def app(scope, receive, send):
# The simplest ASGI application
assert scope['type'] == 'http'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
]
})
await send({
'type': 'http.response.body',
'body': b'Hello, world!',
})
if __name__ == "__main__":
# Uvicorn service
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")
2. Starlette
To start Starlette with Uvicorn, use the following code:
from starlette.applications import Starlette
from starlette.middleware.gzip import GZipMiddleware
app: Starlette = Starlette()
@app.route("/")
def demo_route() -> None: pass
@app.websocket_route("/")
def demo_websocket_route() -> None: pass
@app.add_exception_handlers(404)
def not_found_route() -> None: pass
@app.on_event("startup")
def startup_event_demo() -> None: pass
@app.on_event("shutdown")
def shutdown_event_demo() -> None: pass
app.add_middleware(GZipMiddleware)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000)
This code initializes Starlette, registers routes, exception handlers, events, and middleware, and then passes it to uvicorn.run
. The uvicorn.run
method sends request data by calling Starlette's call
method.
Let's first analyze the initialization of Starlette:
class Starlette:
def __init__(
self,
debug: bool = False,
routes: typing.Sequence[BaseRoute] = None,
middleware: typing.Sequence[Middleware] = None,
exception_handlers: typing.Dict[
typing.Union[int, typing.Type[Exception]], typing.Callable
] = None,
on_startup: typing.Sequence[typing.Callable] = None,
on_shutdown: typing.Sequence[typing.Callable] = None,
lifespan: typing.Callable[["Starlette"], typing.AsyncGenerator] = None,
) -> None:
"""
:param debug: Decide whether to enable the debug function.
:param route: A list of routes, providing HTTP and WebSocket services.
:param middleware: A list of middleware, applied to each request.
:param exception_handler: A dictionary storing exception callbacks, with HTTP status codes as keys and callback functions as values.
:on_startup: Callback functions called during startup.
:on_shutdown: Callback functions called during shutdown.
:lifespan: The lifespan function in ASGI.
"""
# If lifespan is passed, on_startup and on_shutdown cannot be passed
# Because Starlette essentially converts on_start_up and on_shutdown into a lifespan for Uvicorn calls
assert lifespan is None or (
on_startup is None and on_shutdown is None
), "Use either 'lifespan' or 'on_startup'/'on_shutdown', not both."
# Initialize variables
self._debug = debug
self.state = State()
self.router = Router(
routes, on_startup=on_startup, on_shutdown=on_shutdown, lifespan=lifespan
)
self.exception_handlers = (
{} if exception_handlers is None else dict(exception_handlers)
)
self.user_middleware = [] if middleware is None else list(middleware)
# Build middleware
self.middleware_stack = self.build_middleware_stack()
As can be seen from the code, the initialization already meets most of the requirements. However, there is a function for building middleware that needs further analysis:
class Starlette:
def build_middleware_stack(self) -> ASGIApp:
debug = self.debug
error_handler = None
exception_handlers = {}
# Parse exception handling callbacks and store them in error_handler and exception_handlers
# Only HTTP status code 500 will be stored in error_handler
for key, value in self.exception_handlers.items():
if key in (500, Exception):
error_handler = value
else:
exception_handlers[key] = value
# Order different types of middleware
# The first layer is ServerErrorMiddleware, which can print the error stack when an exception is found, or display an error page in debug mode for easy debugging.
# The second layer is the user middleware layer, where all user-registered middleware will be stored.
# The third layer is ExceptionMiddleware, which is the exception handling layer and will handle all exceptions thrown during route execution.
middleware = (
[Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug)]
+ self.user_middleware
+ [
Middleware(
ExceptionMiddleware, handlers=exception_handlers, debug=debug
)
]
)
# Finally, load the middleware into the app
app = self.router
for cls, options in reversed(middleware):
# cls is the middleware class itself, and options are the parameters we pass
# It can be seen that the middleware itself is also an ASGI APP, and loading middleware is like nesting one ASGI APP inside another, like a matryoshka doll.
app = cls(app=app, **options)
# Since the middleware is loaded in a nested way, and the call is made through `call_next` to call the upper ASGI APP, the reverse order method is used.
return app
After building the middleware, the initialization is complete, and then the uvicorn.run
method will call the call
method:
class Starlette:
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
scope["app"] = self
await self.middleware_stack(scope, receive, send)
This method is simple. It sets the app in the request flow through scope
for subsequent calls, and then starts request processing by calling middleware_stack
. From this method and the middleware initialization, it can be seen that the middleware in Starlette is also an ASGI APP (it can also be seen that the route is an ASGI APP at the bottom of the call stack). At the same time, Starlette also hands over exception handling to the middleware, which is rarely seen in other web application frameworks. It can be seen that Starlette is designed so that each component is as much as possible an ASGI APP.
2. Middleware
As mentioned above, in Starlette, middleware is an ASGI APP. So, all middleware in Starlette must be a class that meets the following form:
class BaseMiddleware:
def __init__(self, app: ASGIApp) -> None:
pass
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
pass
In starlette.middleware
, there are many middleware implementations that meet this requirement. However, this chapter will not cover all middleware, but only select a few representative ones for analysis from the closest to the route to the farthest.
2.1. Exception Handling Middleware - ExceptionMiddleware
The first is the ExceptionMiddleware. Users do not directly interact with this middleware (so it is not placed in starlette.middleware
), but interact with it indirectly through the following method:
@app.app_exception_handlers(404)
def not_found_route() -> None: pass
When the user uses this method, Starlette will hang the callback function in the corresponding dictionary, with the HTTP status code as the key and the callback function as the value.
When ExceptionMiddleware finds that the route request processing has an exception, it can find the corresponding callback function through the HTTP status code of the exception response, pass the request and exception to the user-mounted callback function, and finally throw the result of the user's callback function back to the previous ASGI APP.
In addition, ExceptionMiddleware also supports exception registration. When the exception thrown by the route matches the registered exception, the corresponding callback function for that exception registration is called.
The source code and comments of this class are as follows:
class ExceptionMiddleware:
def __init__(
self, app: ASGIApp, handlers: dict = None, debug: bool = False
) -> None:
self.app = app
self.debug = debug # TODO: We ought to handle 404 cases if debug is set.
# Starlette supports both HTTP status codes and Exception types
self._status_handlers = {} # type: typing.Dict[int, typing.Callable]
self._exception_handlers = {
HTTPException: self.http_exception
} # type: typing.Dict[typing.Type[Exception], typing.Callable]
if handlers is not None:
for key, value in handlers.items():
self.add_exception_handler(key, value)
def add_exception_handler(
self,
exc_class_or_status_code: typing.Union[int, typing.Type[Exception]],
handler: typing.Callable,
) -> None:
# The exception callbacks mounted by the user through the Starlette app method are finally mounted into the _status_handlers or _exception_handler in the class through this method.
if isinstance(exc_class_or_status_code, int):
self._status_handlers[exc_class_or_status_code] = handler
else:
assert issubclass(exc_class_or_status_code, Exception)
self._exception_handlers[exc_class_or_status_code] = handler
def _lookup_exception_handler(
self, exc: Exception
) -> typing.Optional[typing.Callable]:
# Look up the callback function related to the registered exception, find the corresponding callback function for the exception through mro
#
# The user may mount a base class, and subsequent subclasses of the mounted exception will also call the callback registered for the base class.
# For example, the user registers a base class, and then there are two exceptions, user exception and system exception, both inheriting from this base class.
# When the function throws the user exception or the system exception later, the corresponding callback registered for the base class will be executed.
for cls in type(exc).__mro__:
if cls in self._exception_handlers:
return self._exception_handlers[cls]
return None
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
# The familiar ASGI call method
if scope["type"]!= "http":
# Does not support WebSocket requests
await self.app(scope, receive, send)
return
# Prevent multiple exceptions from occurring in the same response
response_started = False
async def sender(message: Message) -> None:
nonlocal response_started
if message["type"] == "http.response.start":
response_started = True
await send(message)
try:
# Call the next ASGI APP
await self.app(scope, receive, sender)
except Exception as exc:
handler = None
if isinstance(exc, HTTPException):
# If it is an HTTPException, look for it in the registered HTTP callback dictionary
handler = self._status_handlers.get(exc.status_code)
if handler is None:
# If it is a normal exception, look for it in the exception callback dictionary
handler = self._lookup_exception_handler(exc)
if handler is None:
# If the corresponding exception cannot be found, throw it upwards
raise exc from None
# Only handle one exception per response
if response_started:
msg = "Caught handled exception, but response already started."
raise RuntimeError(msg) from exc
request = Request(scope, receive=receive)
if asyncio.iscoroutinefunction(handler):
response = await handler(request, exc)
else:
response = await run_in_threadpool(handler, request, exc)
# Process the request with the response generated by the callback function
await response(scope, receive, sender)
2.2. User Middleware
Next is the user middleware, which is the middleware we come into contact with the most. When using starlette.middleware
, we usually inherit from a middleware called BaseHTTPMiddleware
and expand based on the following code:
class DemoMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app: ASGIApp,
) -> None:
super(DemoMiddleware, self).__init__(app)
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
# Before
response: Response = await call_next(request)
# After
return response
If you want to perform preprocessing before the request, write relevant code in the before
block. If you want to process after the request, write code in the after
block. It is very simple to use, and they are in the same scope, which means that the variables in this method do not need to be propagated through context or dynamic variables (if you have come into contact with the middleware implementation in Django or Flask, you will understand the elegance of Starlette's implementation).
Now let's see how it is implemented. The code is very simple, about 60 lines, but with a lot of comments:
class BaseHTTPMiddleware:
def __init__(self, app: ASGIApp, dispatch: DispatchFunction = None) -> None:
# Assign the next-level ASGI app
self.app = app
# If the user passes dispatch, use the user-passed function, otherwise use its own dispatch
# Generally, users inherit from BaseHTTPMiddleware and rewrite the dispatch method
self.dispatch_func = self.dispatch if dispatch is None else dispatch
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
A function with the ASGI standard function signature, representing that ASGI requests will enter from here.
"""
if scope["type"]!= "http":
# If the type is not http, the middleware will not be passed (that is, WebSocket is not supported)
# To support WebSocket, it is very difficult to implement the middleware in this way. When I implemented the rap framework, I sacrificed some functions to achieve the middleware processing for WebSocket-like traffic.
await self.app(scope, receive, send)
return
# Generate a request object from scope
request = Request(scope, receive=receive)
# Enter the dispatch logic, that is, the user's processing logic
# The response obtained from this logic is actually generated by the call_next function, and the dispatch function only plays a role of passing.
response = await self.dispatch_func(request, self.call_next)
# Return data to the upper layer according to the generated response
await response(scope, receive, send)
async def call_next(self, request: Request) -> Response:
loop = asyncio.get_event_loop()
# Obtain the next-level message through the queue production and consumption model
queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue()
scope = request.scope
# Pass the uvicorn's receive object through the request.receive object
# The receive object used here is still the receive object initialized by uvicorn
receive = request.receive
send = queue.put
async def coro() -> None:
try:
await self.app(scope, receive, send)
finally:
# This put operation ensures that the get side will not be blocked
await queue.put(None)
# Run the next ASGI APP in another coroutine through loop.create_task
task = loop.create_task(coro())
# Wait for the return of the next ASGI APP
message = await queue.get()
if message is None:
# If the obtained value is empty, it means that the next ASGI APP has not returned a response, and an error may have occurred.
# By calling task.result(), if the coroutine has an exception, the error of the coroutine will be thrown.
task.result()
# If no exception is thrown, it may be due to user error, such as returning an empty response.
# At this time, it is impossible to return a response to the client, so an exception needs to be created to facilitate the subsequent generation of a 500 response.
raise RuntimeError("No response returned.")
# When ASGI processes the response, it is done in multiple steps. Normally, the above queue.get is the first step to obtain the response.
assert message["type"] == "http.response.start"
async def body_stream() -> typing.AsyncGenerator[bytes, None]:
# Other processing will be handed over to the body_stream function
# This method simply keeps returning the data stream
while True:
message = await queue.get()
if message is None:
break
assert message["type"] == "http.response.body"
yield message.get("body", b"")
task.result()
# Put the body_stream function into the Response method
# The response itself is also a class similar to an ASGI APP. According
2.3. ServerErrorMiddleware
ServerErrorMiddleware is quite similar to ExceptionMiddleware (so this part won't be elaborated further). Their overall logic is mostly the same. However, while ExceptionMiddleware is responsible for capturing and handling routing exceptions, ServerErrorMiddleware mainly serves as a fallback measure to ensure that a legitimate HTTP response is always returned.
The indirect call function of ServerErrorMiddleware is the same as that of ExceptionMiddleware. But only when the registered HTTP status code is 500 will the callback be registered in ServerErrorMiddleware:
@app.exception_handlers(500)
def not_found_route() -> None: pass
ServerErrorMiddleware is at the top level of the ASGI APP. It takes on the task of handling fallback exceptions. What it needs to do is simple: if an exception occurs during the processing of the next-level ASGI APP, it will enter the fallback logic:
- 1. If debug is enabled, return the debug page.
- 2. If there is a registered callback, execute the registered callback.
- 3. If neither of the above, return a 500 response.
3. Route
In Starlette, routes are divided into two parts. One part, which I call the Router of the Real App, is at the level below the middleware. It is responsible for almost everything in Starlette except the middleware, mainly including route lookup and matching, application startup and shutdown handling, etc. The other part consists of the routes registered to the Router.
3.1. Router
The Router is straightforward. Its main responsibility is to load and match routes. Here are the source code and comments, excluding the part of loading routes:
class Router:
def __init__(
self,
routes: typing.Sequence[BaseRoute] = None,
redirect_slashes: bool = True,
default: ASGIApp = None,
on_startup: typing.Sequence[typing.Callable] = None,
on_shutdown: typing.Sequence[typing.Callable] = None,
lifespan: typing.Callable[[typing.Any], typing.AsyncGenerator] = None,
) -> None:
# Load the information from Starlette initialization
self.routes = [] if routes is None else list(routes)
self.redirect_slashes = redirect_slashes
self.default = self.not_found if default is None else default
self.on_startup = [] if on_startup is None else list(on_startup)
self.on_shutdown = [] if on_shutdown is None else list(on_shutdown)
async def default_lifespan(app: typing.Any) -> typing.AsyncGenerator:
await self.startup()
yield
await self.shutdown()
# If the initialized lifespan is empty, convert on_startup and on_shutdown to lifespan
self.lifespan_context = default_lifespan if lifespan is None else lifespan
async def not_found(self, scope: Scope, receive: Receive, send: Send) -> None:
"""Logic executed when no route is matched"""
if scope["type"] == "websocket":
# WebSocket match failed
websocket_close = WebSocketClose()
await websocket_close(scope, receive, send)
return
# If we're running inside a starlette application then raise an
# exception, so that the configurable exception handler can deal with
# returning the response. For plain ASGI apps, just return the response.
if "app" in scope:
# In the __call__ method of starlette.applications, we can see that starlette stores itself in scope.
# After throwing an exception here, it can be captured by ServerErrorMiddleware.
raise HTTPException(status_code=404)
else:
# For calls not from Starlette, directly return an error
response = PlainTextResponse("Not Found", status_code=404)
await response(scope, receive, send)
async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
Handle ASGI lifespan messages, which allows us to manage application
startup and shutdown events.
"""
# Lifespan execution logic. When executing, Starlette communicates with the ASGI server. But currently, there may be some functions yet to be developed in this code.
first = True
app = scope.get("app")
await receive()
try:
if inspect.isasyncgenfunction(self.lifespan_context):
async for item in self.lifespan_context(app):
assert first, "Lifespan context yielded multiple times."
first = False
await send({"type": "lifespan.startup.complete"})
await receive()
else:
for item in self.lifespan_context(app): # type: ignore
assert first, "Lifespan context yielded multiple times."
first = False
await send({"type": "lifespan.startup.complete"})
await receive()
except BaseException:
if first:
exc_text = traceback.format_exc()
await send({"type": "lifespan.startup.failed", "message": exc_text})
raise
else:
await send({"type": "lifespan.shutdown.complete"})
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
The main entry point to the Router class.
"""
# The main function for matching and executing routes
# Currently, only http, websocket, lifespan types are supported
assert scope["type"] in ("http", "websocket", "lifespan")
# Initialize the router in scope
if "router" not in scope:
scope["router"] = self
if scope["type"] == "lifespan":
# Execute lifespan logic
await self.lifespan(scope, receive, send)
return
partial = None
# Perform route matching
for route in self.routes:
match, child_scope = route.matches(scope)
if match == Match.FULL:
# If it is a full match (both URL and method match)
# Then perform normal route processing
scope.update(child_scope)
await route.handle(scope, receive, send)
return
elif match == Match.PARTIAL and partial is None:
# If it is a partial match (URL matches, method doesn't match)
# Then retain the value and continue matching
partial = route
partial_scope = child_scope
if partial is not None:
# If there is a route with a partial match, also continue execution, but the route will respond with an HTTP method error
scope.update(partial_scope)
await partial.handle(scope, receive, send)
return
if scope["type"] == "http" and self.redirect_slashes and scope["path"]!= "/":
# Unmatched situation, judge redirection
redirect_scope = dict(scope)
if scope["path"].endswith("/"):
redirect_scope["path"] = redirect_scope["path"].rstrip("/")
else:
redirect_scope["path"] = redirect_scope["path"] + "/"
for route in self.routes:
match, child_scope = route.matches(redirect_scope)
if match!= Match.NONE:
# Match again. If the result is not empty, send a redirection response
redirect_url = URL(scope=redirect_scope)
response = RedirectResponse(url=str(redirect_url))
await response(scope, receive, send)
return
# If none of the above processes hit, it means no route is found. At this time, the default route will be executed, and the default default route is 404 not found
await self.default(scope, receive, send)
It can be seen that the Router code is quite simple. Most of the code is concentrated in the call
method. However, there are multiple traversals to query routes, and each route executes a regular expression to judge whether it matches. Some people may think that this execution speed is slow. I used to think so too, and then I implemented a route tree to replace it (see route_trie.py
for details). But after a performance test, I found that when the number of routes does not exceed 50, the loop matching performance is better than the route tree. When the number does not exceed 100, the two are equivalent. And under normal circumstances, the number of routes we specify will not exceed 100. So, there is no need to worry about the matching performance of this part of the routes. If you are still worried, you can use Mount
to group the routes to reduce the number of matches.
3.2. Other Routes
Mount
inherits from BaseRoute
, and so do other routes like HostRoute
, WebSocketRoute
. They provide similar methods, with only slight differences in implementation (mainly in initialization, route matching, and reverse lookup). Let's first look at BaseRoute
:
class BaseRoute:
def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
# A standard matching function signature. Each Route should return a (Match, Scope) tuple.
# Match has 3 types:
# NONE: No match.
# PARTIAL: Partial match (URL matches, method doesn't match).
# FULL: Full match (both URL and method match).
# Scope will basically return the following format, but Mount returns more content:
# {"endpoint": self.endpoint, "path_params": path_params}
raise NotImplementedError() # pragma: no cover
def url_path_for(self, name: str, **path_params: str) -> URLPath:
# Generate a reverse lookup according to the name
raise NotImplementedError() # pragma: no cover
async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
# The function that can be called after being matched by the Router
raise NotImplementedError() # pragma: no cover
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
A route can be used in isolation as a stand-alone ASGI app.
This is a somewhat contrived case, as they're almost always used
within a Router, but could be useful for some tooling and minimal apps.
"""
# If the route is called as a stand-alone ASGI APP, it will perform matching and respond on its own
match, child_scope = self.matches(scope)
if match == Match.NONE:
if scope["type"] == "http":
response = PlainTextResponse("Not Found", status_code=404)
await response(scope, receive, send)
elif scope["type"] == "websocket":
websocket_close = WebSocketClose()
await websocket_close(scope, receive, send)
return
scope.update(child_scope)
await self.handle(scope, receive, send)
It can be seen that BaseRoute
does not provide many functions, and other routes are extended based on it:
- Route: A standard HTTP route. It is responsible for route matching through the HTTP URL and HTTP Method, and then provides a method to call the HTTP route.
-
WebSocketRoute: A standard WebSocket route. It matches routes according to the HTTP URL, and then generates a session through the
WebSocket
instarlette.websocket
and passes it to the corresponding function. -
Mount: A nested encapsulation of routes. Its matching method is prefix matching of the URL, and it forwards requests to the next-level ASGI APP that meets the rules. When its next-level ASGI APP is a Router, the call chain may be like this: Router->Mount->Router->Mount->Router->Route. By using
Mount
, routes can be grouped, and the matching speed can be increased. It is recommended. Moreover, it can also distribute requests to other ASGI APPs, such as Starlette->ASGI Middleware->Mount->Other Starlette->... -
Host: It distributes requests according to the
Host
in the user request to the corresponding ASGI APP, which can beRoute
,Mount
, middleware, etc.
4. Other Components
As can be seen from the above, most components in Starlette are designed as ASGI APPs, which are highly compatible. Although this sacrifices a little performance, the compatibility is extremely strong. Other components are also more or less designed like ASGI APPs. Before introducing other components, let's first look at the overall project structure of Starlette:
├── middleware # Middleware
├── applications.py # Startup application
├── authentication.py # Authentication related
├── background.py # Encapsulates background tasks, which will be executed after the response is returned
├── concurrency.py # Some small asyncio-related encapsulations. In the new version, the anyio library is directly used instead.
├── config.py # Configuration
├── convertors.py # Some type conversion methods
├── datastructures.py # Some data structures, such as Url, Header, Form, QueryParam, State, etc.
├── endpoints.py # Routes that support cbv and a slightly more advanced WebSocket encapsulation
├── exceptions.py # Exception handling
├── formparsers.py # Parsing of Form, File, etc.
├── graphql.py # Responsible for handling GraphQL related
├── __init__.py
├── py.typed # Type hints needed by Starlette
├── requests.py # Requests, for users to obtain data
├── responses.py # Responses, responsible for initializing headers and cookies, generating response data according to different Response classes, and then having a class ASGI call interface. This interface will send the ASGI protocol to the Uvicorn service. After sending, if there are background tasks, they will be executed until completion.
├── routing.py # Routing
├── schemas.py # OpenApi related schemas
├── staticfiles.py # Static files
├── status.py # HTTP status codes
├── templating.py # Template responses based on Jinja
├── testclient.py # Test client
├── types.py # Types
└── websockets.py # WebSocket
There are many files above, and some simpler ones will be skipped.
4.1. Request
The Request
class is very simple. It inherits from HttpConnection
. This class mainly parses the Scope
passed by the ASGI protocol to extract information such as the URL and method. And the Request
class adds the functions of reading request data and returning data (HTTP 1.1 supports the server pushing data to the client). Among them, reading data depends on a core function - stream
. Its source code is as follows:
async def stream(self) -> typing.AsyncGenerator[bytes, None]:
# If it has been read, get data from the cache
if hasattr(self, "_body"):
yield self._body
yield b""
return
if self._stream_consumed:
raise RuntimeError("Stream consumed")
self._stream_consumed = True
while True:
# Continuously get data from the receive loop of the ASGI container
message = await self._receive()
if message["type"] == "http.request":
body = message.get("body", b"")
if body:
# Return the data if it is not empty
yield body
if not message.get("more_body", False):
# It means that all body data has been obtained
break
elif message["type"] == "http.disconnect":
# It means that the connection with the client has been closed
self._is_disconnected = true
# Throw an exception. Users calling `await request.body()` or `await request.json()` will throw an exception.
raise ClientDisconnect()
# Return empty bytes to mark the end
yield b""
This implementation is simple, but there is a small bug. Those familiar with Nginx or other web services know that general intermediate servers do not process body data but only pass it on. The same is true for ASGI. After processing the URL and header, Uvicorn starts to call the ASGI APP and passes the send
and receive
objects down. These two objects will pass through multiple ASGI APPs and reach the route ASGI APP for users to use in functions. So the receive
object received by the Request
is generated by Uvicorn. And the data source of receive
comes from an asyncio.Queue
queue. From the analysis of the middleware, each ASGI APP generates a Request
object based on scope
and receive
, which means that the Request
objects of each layer of ASGI APP are inconsistent. If the middleware calls the Request
object to read the body, it will consume the data in the queue in advance through receive
, causing subsequent ASGI APPs to be unable to read body data through the Request
object. The sample code for this problem is as follows:
import asyncio
from starlette.applications import Starlette
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
app: Starlette = Starlette()
class DemoMiddleware(BaseHTTPMiddleware):
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
print(request, await request.body())
return await call_next(request)
app.add_middleware(DemoMiddleware)
@app.route("/")
async def demo(request: Request) -> JSONResponse:
try:
await asyncio.wait_for(request.body(), 1)
return JSONResponse({"result": True})
except asyncio.TimeoutError:
return JSONResponse({"result": False})
if __name__ == "__main__":
import uvicorn # type: ignore
uvicorn.run(app)
Run it and check the result of the request:
-> curl http://127.0.0.1:8000
{"result":false}
As shown above, the result is false
, which means that the execution of request.body
has timed out because the receive
queue is already empty and no data can be retrieved. Without a timeout, this request would get stuck.
To solve this problem, let's first look at how Request
gets the body. Since the user can get the body multiple times and the data remains the same, the implementation idea is to cache the data after retrieval. We can follow this idea. Since the data is retrieved via receive
, we can construct a receive
function after reading the data. This function returns data similar to the ASGI communication protocol and has complete body data (meeting the construction of Request.stream
for getting the body). The code is as follows:
async def proxy_get_body(request: Request) -> bytes:
async def receive() -> Message:
return {"type": "http.request", "body": body}
body = await request.body()
request._receive = receive
return body
Subsequently, if any level of the ASGI APP needs to get the body data, it can call this function to obtain the body data without affecting the subsequent ASGI APPs' ability to get the body data.
5. Summary
By now, we have analyzed several important functional codes of Starlette. Starlette is an excellent library with a great design concept. It is recommended that you read the Starlette source code by yourself, which will be helpful for writing your own frameworks in the future.
Leapcell: The Best Serverless Platform for Python app Hosting
Finally, I'd like to share the most suitable platform for deploying FastAPI: Leapcell
1. Multi - Language Support
- Develop with JavaScript, Python, Go, or Rust.
2. Deploy unlimited projects for free
- pay only for usage — no requests, no charges.
3. Unbeatable Cost Efficiency
- Pay - as - you - go with no idle charges.
- Example: $25 supports 6.94M requests at a 60ms average response time.
4. Streamlined Developer Experience
- Intuitive UI for effortless setup.
- Fully automated CI/CD pipelines and GitOps integration.
- Real - time metrics and logging for actionable insights.
5. Effortless Scalability and High Performance
- Auto - scaling to handle high concurrency with ease.
- Zero operational overhead — just focus on building.
Explore more in the documentation!
Leapcell Twitter: https://x.com/LeapcellHQ
Top comments (0)