While asynchronous route handlers (async def
) are powerful for I/O-bound tasks, directly running CPU-intensive operations like machine learning model inference within them poses a significant problem. Python's asyncio
relies on a single-threaded event loop to manage concurrent tasks. If a function within an async def
route performs a long-running computation without yielding control (i.e., without using await
on an operation that allows the event loop to switch tasks), it effectively freezes the event loop. During this time, the server cannot respond to any other incoming requests, defeating the purpose of using an asynchronous framework for high concurrency.
Consider a typical ML prediction endpoint:
# Assume 'model' is a loaded ML model (e.g., scikit-learn)
# Assume 'preprocess_input' and 'format_output' exist
# Problematic Approach: Blocking the event loop
@app.post("/predict_blocking")
async def predict_blocking(data: InputData): # InputData is a Pydantic model
processed_data = preprocess_input(data)
# This line BLOCKS the event loop if model.predict is CPU-bound
prediction = model.predict(processed_data)
results = format_output(prediction)
return {"prediction": results}
In this example, if model.predict()
takes several hundred milliseconds or even seconds to run (common for complex models or large inputs), the entire FastAPI application will be unresponsive during that time.
FastAPI provides a clean way to handle this situation by running blocking, CPU-bound code in a separate thread pool. This allows the main event loop to remain unblocked and continue handling other requests while the heavy computation occurs in another thread.
The key utility here is run_in_threadpool
, a function provided by Starlette (the underlying ASGI toolkit FastAPI uses) and readily available in FastAPI. You await
this function, passing it the blocking function you want to execute along with its arguments.
Here's how to refactor the previous example correctly:
from fastapi.concurrency import run_in_threadpool
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
# Assume 'model' is loaded and 'preprocess_input', 'format_output' exist
# Example placeholder for the blocking function
def run_model_inference(processed_data):
# Simulate a CPU-bound task
import time
time.sleep(0.5) # Represents model.predict() time
# In reality: prediction = model.predict(processed_data)
prediction = [1] # Placeholder result
return prediction
# Define input data model
class InputData(BaseModel):
feature1: float
feature2: float
# Correct Approach: Using run_in_threadpool
@app.post("/predict_non_blocking")
async def predict_non_blocking(data: InputData):
# Preprocessing can often be async if it involves I/O,
# but here we assume it's synchronous CPU work or quick.
processed_data = preprocess_input(data) # Assume this returns needed format
# Offload the blocking call to the thread pool
# Pass the function and its arguments
prediction = await run_in_threadpool(run_model_inference, processed_data)
# Postprocessing
results = format_output(prediction)
return {"prediction": results}
# Dummy implementations for completeness
def preprocess_input(data: InputData): return [[data.feature1, data.feature2]]
def format_output(prediction): return prediction[0]
In predict_non_blocking
, the call await run_in_threadpool(run_model_inference, processed_data)
does the following:
run_model_inference
function (which contains the blocking model.predict()
call) to be executed in a separate thread managed by a thread pool.run_model_inference
function completes in its thread, run_in_threadpool
retrieves the result.await
completes, and the execution of the predict_non_blocking
function resumes with the prediction
result.Diagram illustrating how
run_in_threadpool
prevents blocking the event loop compared to a direct call.
run_in_threadpool
The primary use case for run_in_threadpool
within an async def
route is for CPU-bound synchronous code that you cannot easily make asynchronous (like most standard ML library inference calls).
Use it for:
model.predict()
, model.transform()
from libraries like scikit-learn, TensorFlow (in session run mode), PyTorch (without specific async support).Do NOT use it for:
async
libraries (like httpx
for HTTP requests, asyncpg
or databases
for databases) and await
them directly. Wrapping I/O operations in run_in_threadpool
adds unnecessary thread overhead and doesn't leverage the efficiency of the event loop for I/O.async def
. Awaiting an async def
function directly is the standard way to run it.By correctly identifying and offloading blocking CPU-bound operations using run_in_threadpool
, you ensure that your FastAPI application remains responsive and can effectively handle concurrent requests, even when performing computationally intensive machine learning inference. This is a standard pattern for integrating synchronous ML workflows into modern asynchronous web frameworks.
© 2025 ApX Machine Learning