Skip to content

Raw SQL

Ferro exposes a raw SQL escape hatch for statements that don't fit a Model — Postgres GUCs (set_config, SET LOCAL), advisory locks, LISTEN/NOTIFY, or any one-off query.

Raw SQL is an escape hatch

Bind values cross the FFI as wire-close primitives, and rows come back as dict[str, str | int | float | bool | bytes | None]. UUID/datetime/JSON columns are returned as strings. If you want typed rows, use the ORM.

Two surfaces, same plumbing

Transaction-bound (preferred)

from ferro import transaction

async with transaction() as tx:
    await tx.execute(
        "select set_config('request.jwt.claims', $1, true)",
        claims_json,
    )
    rows = await tx.fetch_all(
        "select id, name from users where org_id = $1 limit 50",
        org.id,
    )
    row = await tx.fetch_one(
        "select count(*) as n from users where org_id = $1",
        org.id,
    )

The tx handle owns the transaction's connection. You cannot misuse it — calling tx.execute(...) after the async with block exits raises RuntimeError.

Top-level (using or active transaction)

from ferro import execute, fetch_all, fetch_one

# Outside any tx — runs on the default connection.
await execute("select pg_advisory_unlock_all()")

# Route explicitly to a named connection.
await execute("select run_pipeline_job($1)", job_id, using="service")

# Inside a tx — auto-picked up via the same ContextVar that Model.create() uses.
async with transaction(using="service"):
    await execute("select set_config('request.jwt.claims', $1, true)", claims_json)
    rows = await fetch_all("select * from foo where org_id = $1", org_id)

Passing using=... inside an active transaction raises. A transaction is pinned to one connection, and unqualified raw SQL inherits that connection.

Placeholders are native to the backend

Backend Placeholder syntax Example
Postgres $1, $2, … select set_config('k', $1, true)
SQLite ? (positional) select * from users where id = ?

There is no translation layer. What you write is what sqlx::query(sql) runs. Mismatches surface as the database driver's own error.

Bind type table

Python type Sent as Postgres cast you must write
None NULL
bool bool
int i64
float f64
str text
bytes / bytearray bytea / blob
uuid.UUID text $N::uuid
datetime.datetime ISO 8601 text $N::timestamptz
datetime.date ISO 8601 text $N::date
datetime.time ISO 8601 text $N::time
decimal.Decimal text $N::numeric
enum.Enum recursive on .value (depends on .value type)
dict / list json.dumps(v) text $N::jsonb
anything else TypeError is raised

Raw SQL has no schema map, so Ferro does not auto-cast bind values. This matches asyncpg / psycopg / pgx behavior. Never f-string user input into the sql argument — use placeholders and pass values as positional args.

Postgres cast cheat-sheet

"... where id = $1::uuid"                # uuid.UUID
"... where created_at = $1::timestamptz" # datetime
"... where day = $1::date"               # date
"... where amount = $1::numeric"         # Decimal
"... set data = $1::jsonb"               # dict / list

Connection affinity

Outside a transaction() block, each top-level execute / fetch_all / fetch_one call runs on the selected named pool (using=...) or the default pool. Consecutive calls may use different physical connections from that pool. Wrap in transaction(using=...) for connection-affinity-sensitive operations like SET LOCAL, advisory locks, or LISTEN/NOTIFY.

What raw SQL doesn't do

  • No typed rows. Rows are always plain dicts of primitives. If you want uuid.UUID / datetime / Decimal objects, use Model.fetch_*.
  • No multi-statement strings. One statement per call.
  • No string-interpolation guard. The API forces placeholders by shape; detecting f-strings at runtime is not possible.
  • No auto type-casts on Postgres. Write $N::uuid / $N::jsonb yourself.

API reference

execute(sql, *args, using=None) async

Run a raw SQL statement, returning rows affected.

Honors the active transaction() block via the _CURRENT_TRANSACTION ContextVar. Outside any transaction, runs on a one-off pool connection. Two consecutive top-level execute calls outside a transaction may use different pool connections — wrap in transaction() if you need connection affinity (e.g. SET LOCAL, advisory locks, LISTEN/NOTIFY).

Source code in src/ferro/raw.py
async def execute(sql: str, *args: Any, using: str | None = None) -> int:
    """Run a raw SQL statement, returning rows affected.

    Honors the active ``transaction()`` block via the ``_CURRENT_TRANSACTION``
    ContextVar. Outside any transaction, runs on a one-off pool connection.
    Two consecutive top-level ``execute`` calls outside a transaction may use
    different pool connections — wrap in ``transaction()`` if you need
    connection affinity (e.g. ``SET LOCAL``, advisory locks, ``LISTEN/NOTIFY``).
    """
    _check_sql(sql)
    marshalled = [_marshal(a) for a in args]
    tx_id, using = _transaction_or_using(using)
    return await _raw_execute(sql, marshalled, tx_id, using)

fetch_all(sql, *args, using=None) async

Run a raw SQL query and return all rows as a list of dicts.

Values are wire-close primitives. UUID/datetime/JSON columns come back as strings. If you want typed rows, use the ORM.

Source code in src/ferro/raw.py
async def fetch_all(
    sql: str, *args: Any, using: str | None = None
) -> list[dict[str, Any]]:
    """Run a raw SQL query and return all rows as a list of dicts.

    Values are wire-close primitives. UUID/datetime/JSON columns come back as
    strings. If you want typed rows, use the ORM.
    """
    _check_sql(sql)
    marshalled = [_marshal(a) for a in args]
    tx_id, using = _transaction_or_using(using)
    return await _raw_fetch_all(sql, marshalled, tx_id, using)

fetch_one(sql, *args, using=None) async

Run a raw SQL query and return the first row as a dict, or None.

Callers should LIMIT 1 if the query may return more than one row.

Source code in src/ferro/raw.py
async def fetch_one(
    sql: str, *args: Any, using: str | None = None
) -> dict[str, Any] | None:
    """Run a raw SQL query and return the first row as a dict, or ``None``.

    Callers should ``LIMIT 1`` if the query may return more than one row.
    """
    _check_sql(sql)
    marshalled = [_marshal(a) for a in args]
    tx_id, using = _transaction_or_using(using)
    return await _raw_fetch_one(sql, marshalled, tx_id, using)

Transaction

Handle for a live transaction.

Obtained via async with transaction() as tx. Methods delegate to the top-level :func:execute / :func:fetch_all / :func:fetch_one with this transaction's tx_id set explicitly, so they don't depend on the ContextVar state. This makes the connection-affinity invariant structurally impossible to violate from inside the async with block.

The handle becomes invalid once the async with block exits — any subsequent call raises :class:RuntimeError.

Source code in src/ferro/raw.py
class Transaction:
    """Handle for a live transaction.

    Obtained via ``async with transaction() as tx``. Methods delegate to the
    top-level :func:`execute` / :func:`fetch_all` / :func:`fetch_one` with this
    transaction's ``tx_id`` set explicitly, so they don't depend on the
    ContextVar state. This makes the connection-affinity invariant
    structurally impossible to violate from inside the ``async with`` block.

    The handle becomes invalid once the ``async with`` block exits — any
    subsequent call raises :class:`RuntimeError`.
    """

    __slots__ = ("_tx_id",)

    def __init__(self, tx_id: str) -> None:
        self._tx_id = tx_id

    async def execute(self, sql: str, *args: Any) -> int:
        _check_sql(sql)
        marshalled = [_marshal(a) for a in args]
        return await _raw_execute(sql, marshalled, self._tx_id)

    async def fetch_all(self, sql: str, *args: Any) -> list[dict[str, Any]]:
        _check_sql(sql)
        marshalled = [_marshal(a) for a in args]
        return await _raw_fetch_all(sql, marshalled, self._tx_id)

    async def fetch_one(self, sql: str, *args: Any) -> dict[str, Any] | None:
        _check_sql(sql)
        marshalled = [_marshal(a) for a in args]
        return await _raw_fetch_one(sql, marshalled, self._tx_id)

Attributes

__slots__ = ('_tx_id',) class-attribute instance-attribute

Functions

__init__(tx_id)

Source code in src/ferro/raw.py
def __init__(self, tx_id: str) -> None:
    self._tx_id = tx_id

execute(sql, *args) async

Source code in src/ferro/raw.py
async def execute(self, sql: str, *args: Any) -> int:
    _check_sql(sql)
    marshalled = [_marshal(a) for a in args]
    return await _raw_execute(sql, marshalled, self._tx_id)

fetch_all(sql, *args) async

Source code in src/ferro/raw.py
async def fetch_all(self, sql: str, *args: Any) -> list[dict[str, Any]]:
    _check_sql(sql)
    marshalled = [_marshal(a) for a in args]
    return await _raw_fetch_all(sql, marshalled, self._tx_id)

fetch_one(sql, *args) async

Source code in src/ferro/raw.py
async def fetch_one(self, sql: str, *args: Any) -> dict[str, Any] | None:
    _check_sql(sql)
    marshalled = [_marshal(a) for a in args]
    return await _raw_fetch_one(sql, marshalled, self._tx_id)