Skip to content

Connection & Registry

Functions for managing database connections and the global model registry. connect() registers a (optionally named) connection pool; reset_engine() tears everything down; the registry helpers control schema creation and the identity map. Sessionized routing is exposed via ferro.engines.session(name) / ferro.Session. See the Connections & Databases guide.

Session dataclass

Source code in src/ferro/session.py
@dataclass(slots=True)
class Session:
    connection_name: str | None = None
    session_id: str | None = None
    _token: Any = field(default=None, repr=False, compare=False)
    _enter_context: contextvars.Context | None = field(
        default=None, repr=False, compare=False
    )
    _enter_task: asyncio.Task[Any] | None = field(
        default=None, repr=False, compare=False
    )
    _close_lock: asyncio.Lock | None = field(default=None, repr=False, compare=False)

    async def __aenter__(self) -> "Session":
        self.session_id, resolved_name = _core_open_session(self.connection_name)
        if self.connection_name is None:
            self.connection_name = resolved_name
        self._token = _CURRENT_SESSION.set(self)
        self._enter_context = contextvars.copy_context()
        self._enter_task = asyncio.current_task()
        return self

    async def __aexit__(self, exc_type, exc, tb) -> None:
        try:
            await self.close()
        except Exception as close_exc:
            if exc_type is None:
                raise
            if exc is not None:
                raise exc from close_exc
            raise close_exc

    async def close(self) -> None:
        """Close this session and release its runtime state.

        Safe to call from a different asyncio context than ``__aenter__``.
        Repeated calls are no-ops.

        Raises:
            RuntimeError: If the ambient session in this asyncio context does not
                match this handle (same-context lifecycle misuse), or if
                session-scoped transactions are still open.
        """
        if self._close_lock is None:
            self._close_lock = asyncio.Lock()

        async with self._close_lock:
            if self.session_id is None and self._token is None:
                return

            self._assert_close_allowed()

            if self.session_id is not None:
                session_id = self.session_id
                _core_close_session(session_id)
                self.session_id = None

            if self._token is None:
                self._enter_context = None
                self._enter_task = None
                return

            token = self._token
            self._token = None
            self._enter_context = None
            self._enter_task = None
            self._restore_ambient_session(token)

    def _assert_close_allowed(self) -> None:
        if self._token is None:
            return
        if asyncio.current_task() is not self._enter_task:
            return
        ambient = _CURRENT_SESSION.get()
        if ambient is self:
            return
        entered_ambient = (
            self._enter_context.get(_CURRENT_SESSION)
            if self._enter_context is not None
            else None
        )
        if entered_ambient is self and ambient is not None:
            raise RuntimeError(_SESSION_CLOSE_AMBIENT_MISMATCH)

    def _restore_ambient_session(self, token: Any) -> None:
        ambient = _CURRENT_SESSION.get()
        if ambient is not self:
            try:
                _CURRENT_SESSION.reset(token)
            except ValueError:
                return
            raise RuntimeError(_SESSION_CLOSE_AMBIENT_MISMATCH)
        try:
            _CURRENT_SESSION.reset(token)
        except ValueError:
            return

    def query(self, model_cls):
        from .query import Query

        return Query(model_cls, session=self)

Attributes

connection_name = None class-attribute instance-attribute

session_id = None class-attribute instance-attribute

Functions

__aenter__() async

Source code in src/ferro/session.py
async def __aenter__(self) -> "Session":
    self.session_id, resolved_name = _core_open_session(self.connection_name)
    if self.connection_name is None:
        self.connection_name = resolved_name
    self._token = _CURRENT_SESSION.set(self)
    self._enter_context = contextvars.copy_context()
    self._enter_task = asyncio.current_task()
    return self

__aexit__(exc_type, exc, tb) async

Source code in src/ferro/session.py
async def __aexit__(self, exc_type, exc, tb) -> None:
    try:
        await self.close()
    except Exception as close_exc:
        if exc_type is None:
            raise
        if exc is not None:
            raise exc from close_exc
        raise close_exc

close() async

Close this session and release its runtime state.

Safe to call from a different asyncio context than __aenter__. Repeated calls are no-ops.

Raises:

Type Description
RuntimeError

If the ambient session in this asyncio context does not match this handle (same-context lifecycle misuse), or if session-scoped transactions are still open.

Source code in src/ferro/session.py
async def close(self) -> None:
    """Close this session and release its runtime state.

    Safe to call from a different asyncio context than ``__aenter__``.
    Repeated calls are no-ops.

    Raises:
        RuntimeError: If the ambient session in this asyncio context does not
            match this handle (same-context lifecycle misuse), or if
            session-scoped transactions are still open.
    """
    if self._close_lock is None:
        self._close_lock = asyncio.Lock()

    async with self._close_lock:
        if self.session_id is None and self._token is None:
            return

        self._assert_close_allowed()

        if self.session_id is not None:
            session_id = self.session_id
            _core_close_session(session_id)
            self.session_id = None

        if self._token is None:
            self._enter_context = None
            self._enter_task = None
            return

        token = self._token
        self._token = None
        self._enter_context = None
        self._enter_task = None
        self._restore_ambient_session(token)

query(model_cls)

Source code in src/ferro/session.py
def query(self, model_cls):
    from .query import Query

    return Query(model_cls, session=self)

__init__(connection_name=None, session_id=None, _token=None, _enter_context=None, _enter_task=None, _close_lock=None)

connect(url, auto_migrate=False, name=None, default=False, pool=None, *, identity_map=True, migrate_updates=False, migrate_destructive=False) async

Establish a connection to the database.

Parameters:

Name Type Description Default
url str

The database connection string (e.g., "sqlite:example.db?mode=rwc").

required
auto_migrate bool

If True, automatically create tables for all registered models. Existing tables are left untouched unless migrate_updates / migrate_destructive are also set.

False
name str | None

Optional connection name. Omitted connections register as "default".

None
default bool

If True, make this named connection the default for unqualified operations.

False
pool PoolConfig | None

Optional per-connection pool configuration.

None
identity_map bool

If True (default), keep a per-connection identity map so the same primary key maps to a single Python instance. If False, each load returns fresh instances and the map is not consulted (lower memory use; no a is b guarantees across loads).

True
migrate_updates bool

If True, additionally update existing tables to match the registered models. Implies auto_migrate. What this covers is capability-relative per backend:

  • Both backends: ALTER TABLE ... ADD COLUMN for model fields missing from the live table, using the same column DDL CREATE TABLE would emit (including single-column indexes and, on Postgres, CHECK constraints and foreign keys). NOT NULL fields need a literal default to backfill existing rows — connecting fails with a clear error otherwise.
  • Postgres only: column type changes (ALTER COLUMN ... TYPE ... USING cast) and nullability changes (SET/DROP NOT NULL) when the live column disagrees with the model.
  • SQLite: type/nullability drift cannot be changed in place; ferro emits a UserWarning naming the column and pointing at Alembic. (SQLite's type affinity makes declared-type drift mostly cosmetic.)

After any schema change, the connection pool is refreshed so no cached statement can observe the pre-migration schema.

False
migrate_destructive bool

If True, additionally drop live columns that no longer exist on the model (never whole tables). Implies migrate_updates. Dropping is dependency-aware: explicit indexes covering the column are dropped first; columns that are primary keys or enforced by table constraints fail with a clear error instead.

False

For schema changes beyond these (renames, primary-key changes, complex transforms), use the Alembic bridge — see docs/guide/migrations.md.

Source code in src/ferro/__init__.py
async def connect(
    url: str,
    auto_migrate: bool = False,
    name: str | None = None,
    default: bool = False,
    pool: PoolConfig | None = None,
    *,
    identity_map: bool = True,
    migrate_updates: bool = False,
    migrate_destructive: bool = False,
) -> None:
    """
    Establish a connection to the database.

    Args:
        url: The database connection string (e.g., "sqlite:example.db?mode=rwc").
        auto_migrate: If True, automatically create tables for all registered models.
            Existing tables are left untouched unless ``migrate_updates`` /
            ``migrate_destructive`` are also set.
        name: Optional connection name. Omitted connections register as "default".
        default: If True, make this named connection the default for unqualified operations.
        pool: Optional per-connection pool configuration.
        identity_map: If True (default), keep a per-connection identity map so the same primary
            key maps to a single Python instance. If False, each load returns fresh instances and
            the map is not consulted (lower memory use; no ``a is b`` guarantees across loads).
        migrate_updates: If True, additionally update existing tables to match the
            registered models. Implies ``auto_migrate``. What this covers is
            capability-relative per backend:

            - **Both backends**: ``ALTER TABLE ... ADD COLUMN`` for model fields
              missing from the live table, using the same column DDL ``CREATE TABLE``
              would emit (including single-column indexes and, on Postgres, CHECK
              constraints and foreign keys). NOT NULL fields need a literal default
              to backfill existing rows — connecting fails with a clear error
              otherwise.
            - **Postgres only**: column type changes
              (``ALTER COLUMN ... TYPE ... USING`` cast) and nullability changes
              (``SET/DROP NOT NULL``) when the live column disagrees with the model.
            - **SQLite**: type/nullability drift cannot be changed in place; ferro
              emits a ``UserWarning`` naming the column and pointing at Alembic.
              (SQLite's type affinity makes declared-type drift mostly cosmetic.)

            After any schema change, the connection pool is refreshed so no cached
            statement can observe the pre-migration schema.
        migrate_destructive: If True, additionally **drop** live columns that no
            longer exist on the model (never whole tables). Implies
            ``migrate_updates``. Dropping is dependency-aware: explicit indexes
            covering the column are dropped first; columns that are primary keys or
            enforced by table constraints fail with a clear error instead.

    For schema changes beyond these (renames, primary-key changes, complex
    transforms), use the Alembic bridge — see ``docs/guide/migrations.md``.
    """
    from .relations import resolve_relationships

    resolve_relationships()

    pool_config = pool or PoolConfig()
    await _core_connect(
        url,
        auto_migrate=auto_migrate,
        name=name,
        default=default,
        max_connections=pool_config.max_connections,
        min_connections=pool_config.min_connections,
        identity_map=identity_map,
        migrate_updates=migrate_updates,
        migrate_destructive=migrate_destructive,
    )

PoolConfig

Bases: BaseModel

Connection pool settings for a named Ferro connection.

Source code in src/ferro/__init__.py
class PoolConfig(BaseModel):
    """Connection pool settings for a named Ferro connection."""

    model_config = ConfigDict(frozen=True)

    max_connections: int = PydanticField(default=5, ge=1)
    min_connections: int = PydanticField(default=0, ge=0)

    @model_validator(mode="after")
    def validate(self) -> "PoolConfig":
        if self.min_connections > self.max_connections:
            raise ValueError("min_connections cannot exceed max_connections")
        return self

Attributes

model_config = ConfigDict(frozen=True) class-attribute instance-attribute

max_connections = PydanticField(default=5, ge=1) class-attribute instance-attribute

min_connections = PydanticField(default=0, ge=0) class-attribute instance-attribute

Functions

validate()

Source code in src/ferro/__init__.py
@model_validator(mode="after")
def validate(self) -> "PoolConfig":
    if self.min_connections > self.max_connections:
        raise ValueError("min_connections cannot exceed max_connections")
    return self

set_default_connection(name)

Source code in src/ferro/_core.pyi
def set_default_connection(name: str) -> None: ...

reset_engine()

Source code in src/ferro/_core.pyi
def reset_engine() -> None: ...

create_tables(using=None) async

Source code in src/ferro/_core.pyi
async def create_tables(using: Optional[str] = None) -> None: ...

migrate(using=None, updates=True, destructive=False) async

Run the auto-migrate pass against a connected engine.

Creates missing tables, then (with updates, the default) adds missing model columns to existing tables and reconciles type/nullability drift on Postgres; with destructive it also drops live columns no longer on the model. destructive implies updates. The pool is refreshed after any DDL so no cached statement observes the pre-migration schema.

Source code in src/ferro/_core.pyi
async def migrate(
    using: Optional[str] = None,
    updates: bool = True,
    destructive: bool = False,
) -> None:
    """Run the auto-migrate pass against a connected engine.

    Creates missing tables, then (with ``updates``, the default) adds missing
    model columns to existing tables and reconciles type/nullability drift on
    Postgres; with ``destructive`` it also drops live columns no longer on the
    model. ``destructive`` implies ``updates``. The pool is refreshed after any
    DDL so no cached statement observes the pre-migration schema.
    """
    ...

clear_registry()

Source code in src/ferro/_core.pyi
def clear_registry() -> None: ...

evict_instance(name, pk, using=None, session_id=None)

Source code in src/ferro/_core.pyi
def evict_instance(
    name: str, pk: str, using: Optional[str] = None, session_id: Optional[str] = None
) -> None: ...

version()

Source code in src/ferro/_core.pyi
def version() -> str: ...