FastAPI and async SQLAlchemy 2.0 with pytest done right

Posted on Sun 19 March 2023 in FastAPI

Introduction

In this post we're going to see how to use FastAPI together with SQLAlchemy 2.0 new async features, and how to configure pytest to run async database tests. I've built this project using Python 3.11, so it might not work on earlier versions. However, you should be able to adapt it if needed.

Requirements

Get the code from the GitHub repository.

For this post we're going to use poetry version 1.4.0 or higher to manage our dependencies. If you don't have it installed, you can install it with pip:

pip install poetry

In the pyproject.toml file you can see the dependencies we're going to use. This file will have three dependency groups. The first one is for the main dependencies, the second one is for the development dependencies, and the third one is for the test dependencies.

It also contains some general settings for the project, such as some pytest configuration (highlight to the asyncio_mode = "auto" setting which automatically detects if the tests are async or not) and mypy configuration.

You might notice that we have both psycopg (a.k.a. psycopg3) and asyncpg installed. The reason for this is that one of our test dependencies, pytest-postgresql, requires psycopg. However, we're going to use asyncpg to connect to the database from the main application, so we need to install both. This won't cause any issues, as long as they don't have clashing common dependencies. Also, in production, psycopg will not be installed, as it's only used for testing.

I've opted for using asyncpg for production mainly because of its higher performance and community support. However, you can just use psycopg if you prefer, since it also supports async (and our tests will not be patching the async capabilities of our app).

Start by running poetry install to install the dependencies and generate a new virtual environment. After that, you can run poetry shell to activate the virtual environment.

Setting up the database

We're going to use FastAPI to create a simple API that will allow us to create and retrieve users from a database. The main point here is to show how to use async SQLAlchemy 2.0 with FastAPI, so we're not going to focus on the API itself.

Let's start by creating a configuration file that will hold our database connection string:

# app/config.py
import os

class Config:
    DB_CONFIG = os.getenv(
        "DB_CONFIG",
        "postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}".format(
            DB_USER=os.getenv("DB_USER", "fastapi"),
            DB_PASSWORD=os.getenv("DB_PASSWORD", "fastapi-password"),
            DB_HOST=os.getenv("DB_HOST", "fastapi-postgresql:5432"),
            DB_NAME=os.getenv("DB_NAME", "fastapi"),
        ),
    )


config = Config

This configuration file will be used to get the database connection string from the environment variables. Notice that we're using the postgresql+asyncpg dialect, which is the one that will allow us to use async SQLAlchemy 2.0.

I'm not going to cover how to start a local PostgreSQL database in this post, but you can, for example, use the official PostgreSQL Docker image to start a local database. Just make sure to set the environment variables DB_USER, DB_PASSWORD, DB_HOST and DB_NAME accordingly.

Then, we need to set up how the database connection and session handling will work. This is where the special async configuration of SQLAlchemy 2.0 comes into play.

Let's start with the import statements:

# app/services/database.py
import contextlib
from typing import AsyncIterator

from fastapi import Depends
from sqlalchemy.ext.asyncio import (AsyncConnection, AsyncEngine, AsyncSession,
                                    async_sessionmaker, create_async_engine)
from sqlalchemy.orm import declarative_base

Notice that we're importing the async versions of the SQLAlchemy classes and factories.

Then, let's create a session manager for our database. This class will be used as a singleton and will be responsible for abstracting the database connection and session handling:

# app/services/database.py
Base = declarative_base()

class DatabaseSessionManager:
    def __init__(self):
        self._engine: AsyncEngine | None = None
        self._sessionmaker: async_sessionmaker | None = None

    def init(self, host: str):
        self._engine = create_async_engine(host)
        self._sessionmaker = async_sessionmaker(autocommit=False, bind=self._engine)

    async def close(self):
        if self._engine is None:
            raise Exception("DatabaseSessionManager is not initialized")
        await self._engine.dispose()
        self._engine = None
        self._sessionmaker = None

    @contextlib.asynccontextmanager
    async def connect(self) -> AsyncIterator[AsyncConnection]:
        if self._engine is None:
            raise Exception("DatabaseSessionManager is not initialized")

        async with self._engine.begin() as connection:
            try:
                yield connection
            except Exception:
                await connection.rollback()
                raise

    @contextlib.asynccontextmanager
    async def session(self) -> AsyncIterator[AsyncSession]:
        if self._sessionmaker is None:
            raise Exception("DatabaseSessionManager is not initialized")

        session = self._sessionmaker()
        try:
            yield session
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

    # Used for testing
    async def create_all(self, connection: AsyncConnection):
        await connection.run_sync(Base.metadata.create_all)

    async def drop_all(self, connection: AsyncConnection):
        await connection.run_sync(Base.metadata.drop_all)

sessionmanager = DatabaseSessionManager()

Notice that we're applying the await keywords to the async methods of the SQLAlchemy classes. This is because we're using the async version of the create_engine method, which returns an AsyncEngine object. We will also use the async version of the sessionmaker method, which returns an AsyncSession object for committing and rolling back transactions.

Then, we need to create a FastAPI dependency that will be used to get the database session from the request. This dependency will be used in the API views:

# app/services/database.py
async def get_db():
    async with sessionmanager.session() as session:
        yield session

And we're done with the base database configuration. Now we can create the database models:

# app/models.py
from uuid import uuid4

from sqlalchemy import Column, String, select
from sqlalchemy.exc import IntegrityError, NoResultFound
from sqlalchemy.ext.asyncio import AsyncSession

from app.services.database import Base


class User(Base):
    __tablename__ = "users"
    id = Column(String, primary_key=True)
    email = Column(String, unique=True, nullable=False)
    full_name = Column(String, nullable=False)

    @classmethod
    async def create(cls, db: AsyncSession, id=None, **kwargs):
        if not id:
            id = uuid4().hex

        transaction = cls(id=id, **kwargs)
        db.add(transaction)
        await db.commit()
        await db.refresh(transaction)
        return transaction

    @classmethod
    async def get(cls, db: AsyncSession, id: str):
        try:
            transaction = await db.get(cls, id)
        except NoResultFound:
            return None
        return transaction

    @classmethod
    async def get_all(cls, db: AsyncSession):
        return (await db.execute(select(cls))).scalars().all()

Here we have a simple User model with a create, get and get_all class methods. These methods will be used to create, retrieve and list users from the database from our API views. Notice that they are all marked as async methods, and we're using await for the database operations.

Creating the API views

Now that we have the database configuration and models set up, we can create the API views. Let's start by creating a user module:

# app/views/user.py
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession

from app.services.database import get_db

from ..models import User as UserModel

router = APIRouter(prefix="/user", tags=["user"])


class UserSchemaBase(BaseModel):
    email: str | None = None
    full_name: str | None = None

class UserSchemaCreate(UserSchemaBase):
    pass


class UserSchema(UserSchemaBase):
    id: str

    class Config:
        orm_mode = True


@router.get("/get-user", response_model=UserSchema)
async def get_user(id: str, db: AsyncSession = Depends(get_db)):
    user = await UserModel.get(db, id)
    return user


@router.get("/get-users", response_model=list[UserSchema])
async def get_users(db: AsyncSession = Depends(get_db)):
    users = await UserModel.get_all(db)
    return users


@router.post("/create-user", response_model=UserSchema)
async def create_user(user: UserSchemaCreate, db: AsyncSession = Depends(get_db)):
    user = await UserModel.create(db, **user.dict())
    return user

Here we have a simple FastAPI router with three API views: get_user, get_users and create_user. Notice that we're using the Depends keyword to inject the database async session into the API views. This is how we can use the database session in the API views.

Setting up FastAPI

Now that we have the API views set up, we can create the FastAPI application.

# app/__init__.py
from contextlib import asynccontextmanager
from fastapi import FastAPI

from app.config import config
from app.services.database import sessionmanager


def init_app(init_db=True):
    lifespan = None

    if init_db:
        sessionmanager.init(config.DB_CONFIG)

        @asynccontextmanager
        async def lifespan(app: FastAPI):
            yield
            if sessionmanager._engine is not None:
                await sessionmanager.close()

    server = FastAPI(title="FastAPI server", lifespan=lifespan)

    from app.views.user import router as user_router

    server.include_router(user_router, prefix="/api", tags=["user"])

    return server

Here we have a simple function that creates the FastAPI application. We're also initializing the database session manager here, and registering a shutdown event to close the database connection pool when the application is shut down.

The reason we have the init_db parameter is because we want to be able to create the FastAPI application without initializing the database connection. This is useful for testing, because we want to have a custom database initialization for testing.

We can now run mypy to check if all our type annotations are correct:

$ mypy app
Success: no issues found in 7 source files

This should be enough for setting up a FastAPI application with SQLAlchemy 2.0. However, we're still missing how to run the application and how to test it. In order for us to run our application, first we'll need to create our database tables. Let's see how we can do that using Alembic.

Migrations with Alembic

In our repository, I've already included the alembic configuration. However, I'll show you how to set up alembic from scratch. If you want to follow along, simply delete the alembic directory and the alembic.ini file.

To start with alembic, we can use the alembic init command to create the alembic configuration. We'll use the async template for this:

$ alembic init -t async alembic

This will create the alembic directory with the alembic configuration. We'll need to make a few changes to the configuration.

First, we'll need to import our database models so that they're added to the Base.metadata object. This happens automatically when the model inherits from Base, but we need to import the models to ensure that they're imported before the alembic configuration is loaded. Then, we need to set the sqlalchemy.url configuration to use our database connection string. And finally, we'll point the target metadata to our Base.metadata object.

Below I'll show the changes we need to make to the alembic/env.py file:

# alembic/env.py
from app import models
from app.config import config as app_config
from app.services.database import Base

config = context.config
config.set_main_option("sqlalchemy.url", app_config.DB_CONFIG)
target_metadata = Base.metadata

Then, we're able to run the alembic revision command to create a new revision:

$ alembic revision --autogenerate -m "Adding user model"

This will create a new revision file in the alembic/versions directory. We can then run the alembic upgrade head command to apply the migration to the database:

$ alembic upgrade head

Starting the server

To start the server, run uvicorn run:server --reload. This will start the server on port 8000 by default. The docs will be available at http://localhost:8000/docs. You should be able to see and run any of the API views that we've created.

This should be enough to start using FastAPI with SQLAlchemy 2.0. However, one important component of software development is testing, so let's see how we can test our API views.

Testing the API views

Here I will be mostly interested in showing how to do integration testing with FastAPI and SQLAlchemy 2.0. This means that our tests will call the API views and check the responses. We will not be testing the database models, but a similar setup should work.

Let's start by creating a conftest.py file in the root of our tests/integration directory. This file will be responsible for setting up the test database and creating the FastAPI application for testing. Since this is an intricate setup, let's again break it down into smaller pieces. We'll start with the imports:

# tests/integration/conftest.py
import asyncio
from contextlib import ExitStack

import pytest
from fastapi.testclient import TestClient
from pytest_postgresql import factories
from pytest_postgresql.janitor import DatabaseJanitor
from sqlalchemy.testing.entities import ComparableEntity

from app import init_app
from app.models import User
from app.services.database import get_db, sessionmanager

There isn't much action going here. We're importing the necessary packages, and the init_app function from our application. Let's move on and create our app and client fixtures, used to create the FastAPI test application and test client:

# tests/integration/conftest.py
@pytest.fixture(autouse=True)
def app():
    with ExitStack():
        yield init_app(init_db=False)


@pytest.fixture
def client(app):
    with TestClient(app) as c:
        yield c

This is very standard code so far. Notice that we passed init_db=False to the init_app function. This is because we want to initialize the database connection manually, so that we can create the test database.

Now we'll use the pytest-postgresql package to create a test database. This package will create a test database for us, and will also clean it up after the tests are done.

# tests/integration/conftest.py
test_db = factories.postgresql_proc(port=None, dbname="test_db")

And now we're ready to create the database connection and session. Our test connection will be scoped to the session, so that we can use the same connection for all the tests, as it's best practice to avoid creating a new connection for each test, or even request.

However, we'll need to create a new session-scoped event loop fixture, because the default event loop fixture is function-scoped. Let's start with the connection fixture:

# tests/integration/conftest.py
@pytest.fixture(scope="session")
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()


@pytest.fixture(scope="session", autouse=True)
async def connection_test(test_db, event_loop):
    pg_host = test_db.host
    pg_port = test_db.port
    pg_user = test_db.user
    pg_db = test_db.dbname
    pg_password = test_db.password

    with DatabaseJanitor(
        pg_user, pg_host, pg_port, pg_db, test_db.version, pg_password
    ):
        connection_str = f"postgresql+psycopg://{pg_user}:@{pg_host}:{pg_port}/{pg_db}"
        sessionmanager.init(connection_str)
        yield
        await sessionmanager.close()

The DatabaseJanitor manages the state of the database, but we need to create the connection ande session. Here we're initializing our sessionmanager singleton with the connection settings provided by the postgresql_proc fixture. After our tests are finished, we'll call the close method to dispose our async database engine.

Finally, let's create a function-scoped fixture that will handle creating the database session for each test:

# tests/integration/conftest.py
@pytest.fixture(scope="function", autouse=True)
async def create_tables(connection_test):
    async with sessionmanager.connect() as connection:
        await sessionmanager.drop_all(connection)
        await sessionmanager.create_all(connection)


@pytest.fixture(scope="function", autouse=True)
async def session_override(app, connection_test):
    async def get_db_override():
        async with sessionmanager.session() as session:
            yield session

    app.dependency_overrides[get_db] = get_db_override

Here we're creating the database tables from scratch for each test. Having this done in an isolated connection will ensure that these operations are finished before our tests run. We also don't have to bother with cleaning up the database after the tests are done, because the DatabaseJanitor will take care of that for us.

We're also overriding the get_db FastAPI dependency we've created earlier with a new dependency that will return our test session. Notice that, as was the case with the original get_db dependency, this will ensure that each time we call a FastAPI endpoint, we'll be using a new isolated session. Since our app fixture is

And that's it! We're now ready to write our first test.

# tests/integration/test_create_user.py
def test_create_user(client):
    response = client.get("/api/user/get-users")
    assert response.status_code == 200
    assert response.json() == []

    response = client.post(
        "/api/user/create-user",
        json={"email": "[email protected]", "full_name": "Full Name Test"},
    )
    assert response.status_code == 200

    response = client.get(f"/api/user/get-user?id={response.json().get('id')}")
    assert response.status_code == 200
    assert response.json() == {
        "id": response.json().get("id"),
        "email": "[email protected]",
        "full_name": "Full Name Test",
    }

    response = client.get("/api/user/get-users")
    assert response.status_code == 200
    assert len(response.json()) == 1

This test is pretty straightforward. We're calling the get-users endpoint, which should return an empty list, since we haven't created any users yet. Then we're calling the create-user endpoint, and checking that the response is successful. Finally, we're calling the get-user endpoint, and checking that the response contains the data of the user we've just created. We're also checking that the get-users endpoint returns a list with a single user.

To run our newly created test, we can just run the pytest command:

$ pytest tests/integration

Conclusion

In this article, we've learned how to create a FastAPI application connected with SQLAlchemy 2.0, and how to write tests for it. We've also seen how to use Alembic to create database migrations, and how to use the pytest-postgresql package to create a test database.