Let's put the theory of asynchronous operations into practice. In this section, we'll refactor existing synchronous code and implement new features using async
/await
, run_in_threadpool
for blocking tasks, and background tasks. This hands-on exercise will solidify your understanding of how to improve the responsiveness and efficiency of your FastAPI ML application.
We'll start with a basic, synchronous endpoint that simulates both a potentially slow I/O operation (like fetching feature definitions from an external source) and a CPU-intensive model prediction step.
Imagine you have an endpoint that first needs to fetch some configuration or metadata related to the request (simulated I/O) and then runs a model prediction (simulated CPU work).
import time
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
# Simulate a blocking function (e.g., CPU-bound ML inference)
def run_model_prediction(data_point: float) -> float:
print("Starting model prediction...")
# Simulate computation time
time.sleep(1)
result = data_point * 2 # Simple dummy operation
print("Finished model prediction.")
return result
# Simulate a blocking I/O call (e.g., database query, external API call)
def fetch_external_data(item_id: int) -> dict:
print(f"Fetching external data for item {item_id}...")
# Simulate I/O delay
time.sleep(0.5)
print("Finished fetching external data.")
# Return some dummy data
return {"item_id": item_id, "metadata": "some_fetched_value"}
class PredictionRequest(BaseModel):
item_id: int
feature_value: float
class PredictionResponse(BaseModel):
item_id: int
metadata: str
prediction: float
@app.post("/predict_sync", response_model=PredictionResponse)
def predict_synchronously(request: PredictionRequest):
print("Received prediction request.")
# Step 1: Fetch external data (Blocking I/O)
external_data = fetch_external_data(request.item_id)
# Step 2: Run model prediction (Blocking CPU)
prediction_result = run_model_prediction(request.feature_value)
print("Sending response.")
return PredictionResponse(
item_id=external_data["item_id"],
metadata=external_data["metadata"],
prediction=prediction_result
)
# To run this: uvicorn your_module_name:app --reload
# Send a POST request to http://127.0.0.1:8000/predict_sync
# Body: {"item_id": 123, "feature_value": 5.0}
If you run this application and send multiple requests concurrently (e.g., using tools like ab
or by opening multiple browser tabs quickly), you'll notice that the server processes them one after another. The time.sleep
calls block the entire worker process, preventing it from handling other requests until the current one is fully completed.
FastAPI shines when dealing with I/O-bound operations because async
/await
allows the server to switch contexts while waiting for I/O, handling other requests in the meantime. Let's modify the fetch_external_data
function and the endpoint to be asynchronous. We'll use asyncio.sleep
to simulate non-blocking I/O.
import asyncio # Import asyncio
import time
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI() # Assume FastAPI instance exists
# Simulate a non-blocking I/O call
async def fetch_external_data_async(item_id: int) -> dict: # Note 'async def'
print(f"Fetching external data async for item {item_id}...")
# Simulate non-blocking I/O delay
await asyncio.sleep(0.5) # Note 'await asyncio.sleep'
print("Finished fetching external data async.")
return {"item_id": item_id, "metadata": "some_fetched_value_async"}
# Keep the blocking model prediction function as is for now
def run_model_prediction(data_point: float) -> float:
print("Starting model prediction...")
time.sleep(1) # Still blocking
result = data_point * 2
print("Finished model prediction.")
return result
class PredictionRequest(BaseModel): # Assume defined
item_id: int
feature_value: float
class PredictionResponse(BaseModel): # Assume defined
item_id: int
metadata: str
prediction: float
@app.post("/predict_async_io", response_model=PredictionResponse)
async def predict_with_async_io(request: PredictionRequest): # Note 'async def'
print("Received async I/O prediction request.")
# Step 1: Fetch external data (Non-blocking I/O)
external_data = await fetch_external_data_async(request.item_id) # Note 'await'
# Step 2: Run model prediction (Still Blocking CPU - problematic!)
# This will still block the event loop when called directly!
prediction_result = run_model_prediction(request.feature_value)
print("Sending response.")
return PredictionResponse(
item_id=external_data["item_id"],
metadata=external_data["metadata"],
prediction=prediction_result
)
In this version, the fetch_external_data_async
call is now non-blocking. While the await asyncio.sleep(0.5)
happens, the FastAPI server (running on an ASGI server like Uvicorn) can handle other incoming requests or tasks. However, we still have a problem: run_model_prediction
uses time.sleep(1)
, which is a blocking call. Even within an async def
endpoint, calling a blocking function directly like this will halt the event loop, negating the benefits of async for concurrency during that specific phase.
run_in_threadpool
To properly handle the CPU-bound run_model_prediction
within our asynchronous endpoint, we need to delegate it to a separate thread pool managed by FastAPI/Starlette. This prevents the main event loop from being blocked.
import asyncio
import time
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool # Import run_in_threadpool
from pydantic import BaseModel
app = FastAPI() # Assume FastAPI instance exists
# Non-blocking I/O simulation
async def fetch_external_data_async(item_id: int) -> dict:
print(f"Fetching external data async for item {item_id}...")
await asyncio.sleep(0.5)
print("Finished fetching external data async.")
return {"item_id": item_id, "metadata": "some_fetched_value_async"}
# Blocking CPU-bound simulation - unchanged
def run_model_prediction(data_point: float) -> float:
print("Starting model prediction (in thread pool)...")
time.sleep(1)
result = data_point * 2
print("Finished model prediction (in thread pool).")
return result
class PredictionRequest(BaseModel): # Assume defined
item_id: int
feature_value: float
class PredictionResponse(BaseModel): # Assume defined
item_id: int
metadata: str
prediction: float
@app.post("/predict_full_async", response_model=PredictionResponse)
async def predict_fully_asynchronous(request: PredictionRequest): # async def
print("Received full async prediction request.")
# Step 1: Fetch external data (Non-blocking I/O)
external_data = await fetch_external_data_async(request.item_id) # await
# Step 2: Run model prediction (Blocking CPU, run in thread pool)
# Use run_in_threadpool to avoid blocking the event loop
prediction_result = await run_in_threadpool(run_model_prediction, request.feature_value) # await + run_in_threadpool
print("Sending response.")
return PredictionResponse(
item_id=external_data["item_id"],
metadata=external_data["metadata"],
prediction=prediction_result
)
Now, when predict_fully_asynchronous
is called:
await
s the non-blocking I/O (fetch_external_data_async
). The event loop is free during this time.await
s run_in_threadpool(run_model_prediction, ...)
. This submits the run_model_prediction
function to execute in a separate thread from the thread pool. The event loop is again free to handle other tasks while the model prediction runs in the background thread.run_model_prediction
, the result is returned, and the execution of predict_fully_asynchronous
resumes.This approach correctly leverages asynchronous programming for I/O and safely integrates blocking CPU-bound code without stalling the server.
Suppose after returning the prediction, we want to log the request and result to a file or database without making the client wait. This is a perfect use case for background tasks.
import asyncio
import time
from fastapi import FastAPI, BackgroundTasks # Import BackgroundTasks
from fastapi.concurrency import run_in_threadpool
from pydantic import BaseModel
app = FastAPI() # Assume FastAPI instance exists
# Functions from previous example (async I/O, blocking CPU)
async def fetch_external_data_async(item_id: int) -> dict: # ... (as before)
print(f"Fetching external data async for item {item_id}...")
await asyncio.sleep(0.5)
print("Finished fetching external data async.")
return {"item_id": item_id, "metadata": "some_fetched_value_async"}
def run_model_prediction(data_point: float) -> float: # ... (as before)
print("Starting model prediction (in thread pool)...")
time.sleep(1)
result = data_point * 2
print("Finished model prediction (in thread pool).")
return result
# Function to be run in the background
def log_prediction_details(request_data: dict, response_data: dict):
# Simulate writing to a log file or database
print("\n--- Background Task Started ---")
print(f"Logging Request: {request_data}")
print(f"Logging Response: {response_data}")
# Simulate some work for the background task
time.sleep(0.2)
print("--- Background Task Finished ---\n")
# In a real app, you'd write to a file, DB, or send to a logging service.
class PredictionRequest(BaseModel): # Assume defined
item_id: int
feature_value: float
class PredictionResponse(BaseModel): # Assume defined
item_id: int
metadata: str
prediction: float
@app.post("/predict_with_background", response_model=PredictionResponse)
async def predict_with_background_task(
request: PredictionRequest,
background_tasks: BackgroundTasks # Inject BackgroundTasks
):
print("Received prediction request with background task.")
# Step 1: Fetch external data (Non-blocking I/O)
external_data = await fetch_external_data_async(request.item_id)
# Step 2: Run model prediction (Blocking CPU, run in thread pool)
prediction_result = await run_in_threadpool(run_model_prediction, request.feature_value)
response = PredictionResponse(
item_id=external_data["item_id"],
metadata=external_data["metadata"],
prediction=prediction_result
)
# Add the logging task to run AFTER the response is sent
background_tasks.add_task(
log_prediction_details,
request.dict(), # Pass data to the task
response.dict() # Pass data to the task
)
print("Sending response (background task pending).")
return response # Response is sent here
In this final example:
BackgroundTasks
from FastAPI.background_tasks: BackgroundTasks
as a parameter to our endpoint function. FastAPI automatically injects an instance.log_prediction_details
which contains the logic we want to run in the background. Note that this function itself can be blocking or async, but if it's blocking and long-running, it might still consume resources. For significant background work, dedicated task queues (like Celery) are often preferred.background_tasks.add_task()
, passing the function to run and its arguments.response
to the client.log_prediction_details
is executed after the response has been successfully sent. You will see its print statements appear in the server console after the "Sending response" message.This practice demonstrates how to combine async
/await
for I/O, run_in_threadpool
for CPU-bound tasks, and BackgroundTasks
for post-response processing, creating more performant and responsive FastAPI applications for serving machine learning models. Experiment with these patterns by running the server and sending requests to observe the timing and flow of execution.
© 2025 ApX Machine Learning