Skip to content

Queries

Model.where(...) and Model.select() return a Query — an immutable, chainable builder that executes when awaited via all(), first(), count(), exists(), update(), or delete(). Predicates are lambda-first (User.where(lambda t: t.age >= 18)), col() is the compatibility bridge for operator-shaped predicates, and direct operator style is deprecated for v0.14.0 removal. For migration steps, see Migrating to v0.12.0.

Query

Bases: Generic[T]

Build and execute fluent ORM queries.

Attributes:

Name Type Description
model_cls

Model class used to hydrate results.

where_clause list[QueryNode]

Accumulated filter nodes for the query.

order_by_clause list[dict[str, str]]

Sort definitions sent to the Rust core.

Source code in src/ferro/query/builder.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
class Query(Generic[T]):
    """Build and execute fluent ORM queries.

    Attributes:
        model_cls: Model class used to hydrate results.
        where_clause: Accumulated filter nodes for the query.
        order_by_clause: Sort definitions sent to the Rust core.
    """

    def __init__(
        self, model_cls: Type[T], using: str | None = None, session: Any | None = None
    ):
        """Initialize a query for a model class.

        Args:
            model_cls: Model class that defines the target table.

        Examples:
            >>> query = Query(User)
            >>> query.model_cls is User
            True
        """
        self.model_cls = model_cls
        self._using = using
        self._session = session
        self.where_clause: list["QueryNode"] = []
        self.order_by_clause: list[dict[str, str]] = []
        self._limit: int | None = None
        self._offset: int | None = None
        self._m2m_context: dict[str, Any] | None = None

    def _transaction_or_using(self) -> tuple[str | None, str | None, str | None]:
        from ..state import resolve_operation_scope

        return resolve_operation_scope(
            using=self._using, session=self._session, allow_legacy_default=True
        )

    def _m2m(
        self, join_table: str, source_col: str, target_col: str, source_id: Any
    ) -> "Query[T]":
        """Store many-to-many linkage context for relationship operations"""
        self._m2m_context = {
            "join_table": join_table,
            "source_col": source_col,
            "target_col": target_col,
            "source_id": source_id,
        }
        return self

    @overload
    def where(self, node: "QueryNode") -> "Query[T]": ...

    @overload
    def where(self, node: "Predicate[T]") -> "Query[T]": ...

    def where(self, node: "QueryNode | Predicate[T]") -> "Query[T]":
        """Add a filter condition to the query.

        The recommended style is a lambda predicate of shape
        ``Callable[[QueryProxy[T]], QueryNode]``. The lambda receives a
        fresh :class:`QueryProxy` whose attributes return
        :class:`FieldProxy` instances, so ``lambda t: t.archived == False``
        builds a comparison without static-typing friction. A prebuilt
        :class:`QueryNode` is also accepted, built either with
        :func:`ferro.query.col` (the type-safe escape hatch that preserves
        operator shape) or with operator syntax on class attributes. The
        bare operator form (``User.where(User.age >= 18)``) is deprecated and
        on the v0.14.0 removal track. It does not
        type-check statically:
        the class attribute types as the field type, so the comparison
        resolves to ``bool``, not ``QueryNode``.

        Args:
            node: A predicate callable or a ``QueryNode``.

        Returns:
            The current Query instance for chaining.

        Raises:
            TypeError: If ``node`` is neither a ``QueryNode`` nor a callable,
                or if the callable does not return a ``QueryNode``.

        Examples:
            >>> q1 = User.where(lambda t: t.archived == False)  # noqa: E712
            >>> q2 = User.where(lambda t: t.id == 1)
            >>> isinstance(q1, Query) and isinstance(q2, Query)
            True
        """
        self.where_clause.append(_resolve_where_node(node))
        return self

    def order_by(self, field: Any, direction: str = "asc") -> "Query[T]":
        """Add an ordering clause to the query

        Args:
            field: The field to order by (e.g., User.username).
            direction: The direction of the sort ("asc" or "desc").

        Returns:
            The current Query instance for chaining.

        Raises:
            ValueError: If direction is not "asc" or "desc".

        Examples:
            >>> query = User.select().order_by(User.username, "desc")
            >>> query.order_by_clause[-1]["direction"]
            'desc'
        """
        if direction.lower() not in ("asc", "desc"):
            raise ValueError("direction must be 'asc' or 'desc'")

        col_name = field.column if hasattr(field, "column") else str(field)
        self.order_by_clause.append(
            {"column": col_name, "direction": direction.lower()}
        )
        return self

    def limit(self, value: int) -> "Query[T]":
        """Limit the number of records returned

        Args:
            value: The maximum number of records to return.

        Returns:
            The current Query instance for chaining.

        Examples:
            >>> query = User.select().limit(10)
            >>> query._limit
            10
        """
        self._limit = value
        return self

    def offset(self, value: int) -> "Query[T]":
        """Skip a specific number of records

        Args:
            value: The number of records to skip.

        Returns:
            The current Query instance for chaining.

        Examples:
            >>> query = User.select().offset(20)
            >>> query._offset
            20
        """
        self._offset = value
        return self

    async def all(self) -> list[T]:
        """Return all model instances that match the current query

        Returns:
            A list of model instances.

        Examples:
            >>> users = await User.where(lambda t: t.active == True).all()  # noqa: E712
            >>> isinstance(users, list)
            True
        """
        query_def = {
            "model_name": self.model_cls.__name__,
            "where": [node.to_ir_dict() for node in self.where_clause],
            "order_by": self.order_by_clause,
            "limit": self._limit,
            "offset": self._offset,
            "m2m": self._m2m_context,
        }
        tx_id, using, session_id = self._transaction_or_using()
        results = await fetch_filtered(
            self.model_cls,
            _query_ir_payload_to_json(query_def),
            tx_id,
            using,
            session_id=session_id,
        )
        for instance in results:
            if hasattr(self.model_cls, "_fix_types"):
                self.model_cls._fix_types(instance)
        return results

    async def count(self) -> int:
        """Return the number of records that match the current query

        Returns:
            The count of matching records.

        Examples:
            >>> total = await User.where(lambda t: t.active == True).count()  # noqa: E712
            >>> isinstance(total, int)
            True
        """
        query_def = {
            "model_name": self.model_cls.__name__,
            "where": [node.to_ir_dict() for node in self.where_clause],
            "order_by": [],
            "limit": None,
            "offset": None,
            "m2m": self._m2m_context,
        }
        tx_id, using, session_id = self._transaction_or_using()
        return await count_filtered(
            self.model_cls.__name__,
            _query_ir_payload_to_json(query_def),
            tx_id,
            using,
            session_id=session_id,
        )

    async def update(self, **fields) -> int:
        """Update all records matching the current query

        Args:
            **fields: Field names and values to update.

        Returns:
            The number of records updated.

        Examples:
            >>> updated = await User.where(lambda t: t.id == 1).update(name="Taylor")
            >>> isinstance(updated, int)
            True
        """
        query_def = {
            "model_name": self.model_cls.__name__,
            "where": [node.to_ir_dict() for node in self.where_clause],
            "order_by": [],
            "limit": self._limit,
            "offset": self._offset,
            "m2m": None,
        }
        from pydantic_core import to_json

        tx_id, using, session_id = self._transaction_or_using()
        # Use pydantic_core.to_json to handle Decimals, UUIDs, etc. in kwargs
        return await update_filtered(
            self.model_cls.__name__,
            _query_ir_payload_to_json(query_def),
            to_json(fields).decode(),
            tx_id,
            using,
            session_id=session_id,
        )

    async def first(self) -> T | None:
        """Return the first matching record, or None

        Returns:
            A model instance or None.

        Examples:
            >>> user = await User.select().order_by(User.id).first()
            >>> user is None or isinstance(user, User)
            True
        """
        old_limit = self._limit
        self._limit = 1
        try:
            results = await self.all()
            return results[0] if results else None
        finally:
            self._limit = old_limit

    async def delete(self) -> int:
        """Delete all records matching the current query

        Returns:
            The number of records deleted.

        Examples:
            >>> deleted = await User.where(lambda t: t.disabled == True).delete()  # noqa: E712
            >>> isinstance(deleted, int)
            True
        """
        query_def = {
            "model_name": self.model_cls.__name__,
            "where": [node.to_ir_dict() for node in self.where_clause],
            "order_by": [],
            "limit": self._limit,
            "offset": self._offset,
            "m2m": None,
        }
        tx_id, using, session_id = self._transaction_or_using()
        return await delete_filtered(
            self.model_cls.__name__,
            _query_ir_payload_to_json(query_def),
            tx_id,
            using,
            session_id=session_id,
        )

    async def exists(self) -> bool:
        """Return whether at least one record matches the current query

        Returns:
            True if records exist, otherwise False.

        Examples:
            >>> found = await User.where(lambda t: t.email == "a@b.com").exists()
            >>> isinstance(found, bool)
            True
        """
        return await self.count() > 0

    async def add(self, *instances: Any) -> None:
        """Add links to a many-to-many relationship

        Args:
            *instances: Target model instances that provide an ``id`` attribute.

        Raises:
            RuntimeError: If the query is not bound to a many-to-many context.

        Examples:
            >>> user = await User.create(email="taylor@example.com")
            >>> admin = await Group.create(name="admin")
            >>> staff = await Group.create(name="staff")
            >>> await user.groups.add(admin, staff)
        """
        if not self._m2m_context:
            raise RuntimeError(
                "'.add()' can only be used on Many-to-Many relationships"
            )

        ids = []
        for inst in instances:
            # Assume 'id' for now
            ids.append(getattr(inst, "id"))

        from ..state import _CURRENT_TRANSACTION

        tx_id, using, session_id = self._transaction_or_using()
        await add_m2m_links(
            self._m2m_context["join_table"],
            self._m2m_context["source_col"],
            self._m2m_context["target_col"],
            self._m2m_context["source_id"],
            ids,
            tx_id,
            using,
            session_id=session_id,
        )

    async def remove(self, *instances: Any) -> None:
        """Remove links from a many-to-many relationship

        Args:
            *instances: Target model instances that provide an ``id`` attribute.

        Raises:
            RuntimeError: If the query is not bound to a many-to-many context.

        Examples:
            >>> user = await User.create(email="taylor@example.com")
            >>> admin = await Group.create(name="admin")
            >>> await user.groups.remove(admin)
        """
        if not self._m2m_context:
            raise RuntimeError(
                "'.remove()' can only be used on Many-to-Many relationships"
            )

        ids = []
        for inst in instances:
            ids.append(getattr(inst, "id"))

        from ..state import _CURRENT_TRANSACTION

        tx_id, using, session_id = self._transaction_or_using()
        await remove_m2m_links(
            self._m2m_context["join_table"],
            self._m2m_context["source_col"],
            self._m2m_context["target_col"],
            self._m2m_context["source_id"],
            ids,
            tx_id,
            using,
            session_id=session_id,
        )

    async def clear(self) -> None:
        """Clear all links in a many-to-many relationship

        Raises:
            RuntimeError: If the query is not bound to a many-to-many context.

        Examples:
            >>> user = await User.create(email="taylor@example.com")
            >>> await user.groups.clear()
        """
        if not self._m2m_context:
            raise RuntimeError(
                "'.clear()' can only be used on Many-to-Many relationships"
            )

        from ..state import _CURRENT_TRANSACTION

        tx_id, using, session_id = self._transaction_or_using()
        await clear_m2m_links(
            self._m2m_context["join_table"],
            self._m2m_context["source_col"],
            self._m2m_context["source_id"],
            tx_id,
            using,
            session_id=session_id,
        )

    def __repr__(self):
        """Return a developer-friendly representation of the query"""
        return f"<Query model={self.model_cls.__name__} where={self.where_clause}>"

Attributes

model_cls = model_cls instance-attribute

where_clause = [] instance-attribute

order_by_clause = [] instance-attribute

Functions

__init__(model_cls, using=None, session=None)

Initialize a query for a model class.

Parameters:

Name Type Description Default
model_cls Type[T]

Model class that defines the target table.

required

Examples:

>>> query = Query(User)
>>> query.model_cls is User
True
Source code in src/ferro/query/builder.py
def __init__(
    self, model_cls: Type[T], using: str | None = None, session: Any | None = None
):
    """Initialize a query for a model class.

    Args:
        model_cls: Model class that defines the target table.

    Examples:
        >>> query = Query(User)
        >>> query.model_cls is User
        True
    """
    self.model_cls = model_cls
    self._using = using
    self._session = session
    self.where_clause: list["QueryNode"] = []
    self.order_by_clause: list[dict[str, str]] = []
    self._limit: int | None = None
    self._offset: int | None = None
    self._m2m_context: dict[str, Any] | None = None

where(node)

where(node: QueryNode) -> Query[T]
where(node: Predicate[T]) -> Query[T]

Add a filter condition to the query.

The recommended style is a lambda predicate of shape Callable[[QueryProxy[T]], QueryNode]. The lambda receives a fresh :class:QueryProxy whose attributes return :class:FieldProxy instances, so lambda t: t.archived == False builds a comparison without static-typing friction. A prebuilt :class:QueryNode is also accepted, built either with :func:ferro.query.col (the type-safe escape hatch that preserves operator shape) or with operator syntax on class attributes. The bare operator form (User.where(User.age >= 18)) is deprecated and on the v0.14.0 removal track. It does not type-check statically: the class attribute types as the field type, so the comparison resolves to bool, not QueryNode.

Parameters:

Name Type Description Default
node QueryNode | Predicate[T]

A predicate callable or a QueryNode.

required

Returns:

Type Description
Query[T]

The current Query instance for chaining.

Raises:

Type Description
TypeError

If node is neither a QueryNode nor a callable, or if the callable does not return a QueryNode.

Examples:

>>> q1 = User.where(lambda t: t.archived == False)  # noqa: E712
>>> q2 = User.where(lambda t: t.id == 1)
>>> isinstance(q1, Query) and isinstance(q2, Query)
True
Source code in src/ferro/query/builder.py
def where(self, node: "QueryNode | Predicate[T]") -> "Query[T]":
    """Add a filter condition to the query.

    The recommended style is a lambda predicate of shape
    ``Callable[[QueryProxy[T]], QueryNode]``. The lambda receives a
    fresh :class:`QueryProxy` whose attributes return
    :class:`FieldProxy` instances, so ``lambda t: t.archived == False``
    builds a comparison without static-typing friction. A prebuilt
    :class:`QueryNode` is also accepted, built either with
    :func:`ferro.query.col` (the type-safe escape hatch that preserves
    operator shape) or with operator syntax on class attributes. The
    bare operator form (``User.where(User.age >= 18)``) is deprecated and
    on the v0.14.0 removal track. It does not
    type-check statically:
    the class attribute types as the field type, so the comparison
    resolves to ``bool``, not ``QueryNode``.

    Args:
        node: A predicate callable or a ``QueryNode``.

    Returns:
        The current Query instance for chaining.

    Raises:
        TypeError: If ``node`` is neither a ``QueryNode`` nor a callable,
            or if the callable does not return a ``QueryNode``.

    Examples:
        >>> q1 = User.where(lambda t: t.archived == False)  # noqa: E712
        >>> q2 = User.where(lambda t: t.id == 1)
        >>> isinstance(q1, Query) and isinstance(q2, Query)
        True
    """
    self.where_clause.append(_resolve_where_node(node))
    return self

order_by(field, direction='asc')

Add an ordering clause to the query

Parameters:

Name Type Description Default
field Any

The field to order by (e.g., User.username).

required
direction str

The direction of the sort ("asc" or "desc").

'asc'

Returns:

Type Description
Query[T]

The current Query instance for chaining.

Raises:

Type Description
ValueError

If direction is not "asc" or "desc".

Examples:

>>> query = User.select().order_by(User.username, "desc")
>>> query.order_by_clause[-1]["direction"]
'desc'
Source code in src/ferro/query/builder.py
def order_by(self, field: Any, direction: str = "asc") -> "Query[T]":
    """Add an ordering clause to the query

    Args:
        field: The field to order by (e.g., User.username).
        direction: The direction of the sort ("asc" or "desc").

    Returns:
        The current Query instance for chaining.

    Raises:
        ValueError: If direction is not "asc" or "desc".

    Examples:
        >>> query = User.select().order_by(User.username, "desc")
        >>> query.order_by_clause[-1]["direction"]
        'desc'
    """
    if direction.lower() not in ("asc", "desc"):
        raise ValueError("direction must be 'asc' or 'desc'")

    col_name = field.column if hasattr(field, "column") else str(field)
    self.order_by_clause.append(
        {"column": col_name, "direction": direction.lower()}
    )
    return self

limit(value)

Limit the number of records returned

Parameters:

Name Type Description Default
value int

The maximum number of records to return.

required

Returns:

Type Description
Query[T]

The current Query instance for chaining.

Examples:

>>> query = User.select().limit(10)
>>> query._limit
10
Source code in src/ferro/query/builder.py
def limit(self, value: int) -> "Query[T]":
    """Limit the number of records returned

    Args:
        value: The maximum number of records to return.

    Returns:
        The current Query instance for chaining.

    Examples:
        >>> query = User.select().limit(10)
        >>> query._limit
        10
    """
    self._limit = value
    return self

offset(value)

Skip a specific number of records

Parameters:

Name Type Description Default
value int

The number of records to skip.

required

Returns:

Type Description
Query[T]

The current Query instance for chaining.

Examples:

>>> query = User.select().offset(20)
>>> query._offset
20
Source code in src/ferro/query/builder.py
def offset(self, value: int) -> "Query[T]":
    """Skip a specific number of records

    Args:
        value: The number of records to skip.

    Returns:
        The current Query instance for chaining.

    Examples:
        >>> query = User.select().offset(20)
        >>> query._offset
        20
    """
    self._offset = value
    return self

all() async

Return all model instances that match the current query

Returns:

Type Description
list[T]

A list of model instances.

Examples:

>>> users = await User.where(lambda t: t.active == True).all()  # noqa: E712
>>> isinstance(users, list)
True
Source code in src/ferro/query/builder.py
async def all(self) -> list[T]:
    """Return all model instances that match the current query

    Returns:
        A list of model instances.

    Examples:
        >>> users = await User.where(lambda t: t.active == True).all()  # noqa: E712
        >>> isinstance(users, list)
        True
    """
    query_def = {
        "model_name": self.model_cls.__name__,
        "where": [node.to_ir_dict() for node in self.where_clause],
        "order_by": self.order_by_clause,
        "limit": self._limit,
        "offset": self._offset,
        "m2m": self._m2m_context,
    }
    tx_id, using, session_id = self._transaction_or_using()
    results = await fetch_filtered(
        self.model_cls,
        _query_ir_payload_to_json(query_def),
        tx_id,
        using,
        session_id=session_id,
    )
    for instance in results:
        if hasattr(self.model_cls, "_fix_types"):
            self.model_cls._fix_types(instance)
    return results

count() async

Return the number of records that match the current query

Returns:

Type Description
int

The count of matching records.

Examples:

>>> total = await User.where(lambda t: t.active == True).count()  # noqa: E712
>>> isinstance(total, int)
True
Source code in src/ferro/query/builder.py
async def count(self) -> int:
    """Return the number of records that match the current query

    Returns:
        The count of matching records.

    Examples:
        >>> total = await User.where(lambda t: t.active == True).count()  # noqa: E712
        >>> isinstance(total, int)
        True
    """
    query_def = {
        "model_name": self.model_cls.__name__,
        "where": [node.to_ir_dict() for node in self.where_clause],
        "order_by": [],
        "limit": None,
        "offset": None,
        "m2m": self._m2m_context,
    }
    tx_id, using, session_id = self._transaction_or_using()
    return await count_filtered(
        self.model_cls.__name__,
        _query_ir_payload_to_json(query_def),
        tx_id,
        using,
        session_id=session_id,
    )

update(**fields) async

Update all records matching the current query

Parameters:

Name Type Description Default
**fields

Field names and values to update.

{}

Returns:

Type Description
int

The number of records updated.

Examples:

>>> updated = await User.where(lambda t: t.id == 1).update(name="Taylor")
>>> isinstance(updated, int)
True
Source code in src/ferro/query/builder.py
async def update(self, **fields) -> int:
    """Update all records matching the current query

    Args:
        **fields: Field names and values to update.

    Returns:
        The number of records updated.

    Examples:
        >>> updated = await User.where(lambda t: t.id == 1).update(name="Taylor")
        >>> isinstance(updated, int)
        True
    """
    query_def = {
        "model_name": self.model_cls.__name__,
        "where": [node.to_ir_dict() for node in self.where_clause],
        "order_by": [],
        "limit": self._limit,
        "offset": self._offset,
        "m2m": None,
    }
    from pydantic_core import to_json

    tx_id, using, session_id = self._transaction_or_using()
    # Use pydantic_core.to_json to handle Decimals, UUIDs, etc. in kwargs
    return await update_filtered(
        self.model_cls.__name__,
        _query_ir_payload_to_json(query_def),
        to_json(fields).decode(),
        tx_id,
        using,
        session_id=session_id,
    )

first() async

Return the first matching record, or None

Returns:

Type Description
T | None

A model instance or None.

Examples:

>>> user = await User.select().order_by(User.id).first()
>>> user is None or isinstance(user, User)
True
Source code in src/ferro/query/builder.py
async def first(self) -> T | None:
    """Return the first matching record, or None

    Returns:
        A model instance or None.

    Examples:
        >>> user = await User.select().order_by(User.id).first()
        >>> user is None or isinstance(user, User)
        True
    """
    old_limit = self._limit
    self._limit = 1
    try:
        results = await self.all()
        return results[0] if results else None
    finally:
        self._limit = old_limit

delete() async

Delete all records matching the current query

Returns:

Type Description
int

The number of records deleted.

Examples:

>>> deleted = await User.where(lambda t: t.disabled == True).delete()  # noqa: E712
>>> isinstance(deleted, int)
True
Source code in src/ferro/query/builder.py
async def delete(self) -> int:
    """Delete all records matching the current query

    Returns:
        The number of records deleted.

    Examples:
        >>> deleted = await User.where(lambda t: t.disabled == True).delete()  # noqa: E712
        >>> isinstance(deleted, int)
        True
    """
    query_def = {
        "model_name": self.model_cls.__name__,
        "where": [node.to_ir_dict() for node in self.where_clause],
        "order_by": [],
        "limit": self._limit,
        "offset": self._offset,
        "m2m": None,
    }
    tx_id, using, session_id = self._transaction_or_using()
    return await delete_filtered(
        self.model_cls.__name__,
        _query_ir_payload_to_json(query_def),
        tx_id,
        using,
        session_id=session_id,
    )

exists() async

Return whether at least one record matches the current query

Returns:

Type Description
bool

True if records exist, otherwise False.

Examples:

>>> found = await User.where(lambda t: t.email == "a@b.com").exists()
>>> isinstance(found, bool)
True
Source code in src/ferro/query/builder.py
async def exists(self) -> bool:
    """Return whether at least one record matches the current query

    Returns:
        True if records exist, otherwise False.

    Examples:
        >>> found = await User.where(lambda t: t.email == "a@b.com").exists()
        >>> isinstance(found, bool)
        True
    """
    return await self.count() > 0

add(*instances) async

Add links to a many-to-many relationship

Parameters:

Name Type Description Default
*instances Any

Target model instances that provide an id attribute.

()

Raises:

Type Description
RuntimeError

If the query is not bound to a many-to-many context.

Examples:

>>> user = await User.create(email="taylor@example.com")
>>> admin = await Group.create(name="admin")
>>> staff = await Group.create(name="staff")
>>> await user.groups.add(admin, staff)
Source code in src/ferro/query/builder.py
async def add(self, *instances: Any) -> None:
    """Add links to a many-to-many relationship

    Args:
        *instances: Target model instances that provide an ``id`` attribute.

    Raises:
        RuntimeError: If the query is not bound to a many-to-many context.

    Examples:
        >>> user = await User.create(email="taylor@example.com")
        >>> admin = await Group.create(name="admin")
        >>> staff = await Group.create(name="staff")
        >>> await user.groups.add(admin, staff)
    """
    if not self._m2m_context:
        raise RuntimeError(
            "'.add()' can only be used on Many-to-Many relationships"
        )

    ids = []
    for inst in instances:
        # Assume 'id' for now
        ids.append(getattr(inst, "id"))

    from ..state import _CURRENT_TRANSACTION

    tx_id, using, session_id = self._transaction_or_using()
    await add_m2m_links(
        self._m2m_context["join_table"],
        self._m2m_context["source_col"],
        self._m2m_context["target_col"],
        self._m2m_context["source_id"],
        ids,
        tx_id,
        using,
        session_id=session_id,
    )

remove(*instances) async

Remove links from a many-to-many relationship

Parameters:

Name Type Description Default
*instances Any

Target model instances that provide an id attribute.

()

Raises:

Type Description
RuntimeError

If the query is not bound to a many-to-many context.

Examples:

>>> user = await User.create(email="taylor@example.com")
>>> admin = await Group.create(name="admin")
>>> await user.groups.remove(admin)
Source code in src/ferro/query/builder.py
async def remove(self, *instances: Any) -> None:
    """Remove links from a many-to-many relationship

    Args:
        *instances: Target model instances that provide an ``id`` attribute.

    Raises:
        RuntimeError: If the query is not bound to a many-to-many context.

    Examples:
        >>> user = await User.create(email="taylor@example.com")
        >>> admin = await Group.create(name="admin")
        >>> await user.groups.remove(admin)
    """
    if not self._m2m_context:
        raise RuntimeError(
            "'.remove()' can only be used on Many-to-Many relationships"
        )

    ids = []
    for inst in instances:
        ids.append(getattr(inst, "id"))

    from ..state import _CURRENT_TRANSACTION

    tx_id, using, session_id = self._transaction_or_using()
    await remove_m2m_links(
        self._m2m_context["join_table"],
        self._m2m_context["source_col"],
        self._m2m_context["target_col"],
        self._m2m_context["source_id"],
        ids,
        tx_id,
        using,
        session_id=session_id,
    )

clear() async

Clear all links in a many-to-many relationship

Raises:

Type Description
RuntimeError

If the query is not bound to a many-to-many context.

Examples:

>>> user = await User.create(email="taylor@example.com")
>>> await user.groups.clear()
Source code in src/ferro/query/builder.py
async def clear(self) -> None:
    """Clear all links in a many-to-many relationship

    Raises:
        RuntimeError: If the query is not bound to a many-to-many context.

    Examples:
        >>> user = await User.create(email="taylor@example.com")
        >>> await user.groups.clear()
    """
    if not self._m2m_context:
        raise RuntimeError(
            "'.clear()' can only be used on Many-to-Many relationships"
        )

    from ..state import _CURRENT_TRANSACTION

    tx_id, using, session_id = self._transaction_or_using()
    await clear_m2m_links(
        self._m2m_context["join_table"],
        self._m2m_context["source_col"],
        self._m2m_context["source_id"],
        tx_id,
        using,
        session_id=session_id,
    )

__repr__()

Return a developer-friendly representation of the query

Source code in src/ferro/query/builder.py
def __repr__(self):
    """Return a developer-friendly representation of the query"""
    return f"<Query model={self.model_cls.__name__} where={self.where_clause}>"

col(value)

Treat a model class attribute as a typed query column.

At runtime Ferro's metaclass replaces Model.field with a :class:FieldProxy, so Model.field is already a FieldProxy when accessed on the class. Static type checkers, however, see the field's Pydantic-annotated type (bool, int, ...). That makes expressions like Model.archived == False resolve to bool statically, even though the runtime value is a QueryNode.

col() is runtime-identity for FieldProxy inputs and statically narrows the return type to FieldProxy[T], so col(Model.archived) == False type-checks as QueryNode. Use it when a single attribute trips your type checker; for new code, prefer the lambda predicate API on :meth:Query.where.

Parameters:

Name Type Description Default
value TField

A model class attribute (already a FieldProxy at runtime).

required

Returns:

Type Description
FieldProxy[TField]

The same object, statically typed as FieldProxy[T].

Raises:

Type Description
TypeError

If value is not a FieldProxy. This guards against calling col() on a literal (e.g., col(False)), which is almost certainly a bug.

Examples:

>>> rows = await User.where(col(User.archived) == False).all()  # noqa: E712
Source code in src/ferro/query/nodes.py
def col(value: TField) -> "FieldProxy[TField]":
    """Treat a model class attribute as a typed query column.

    At runtime Ferro's metaclass replaces ``Model.field`` with a
    :class:`FieldProxy`, so ``Model.field`` is already a ``FieldProxy`` when
    accessed on the class. Static type checkers, however, see the field's
    Pydantic-annotated type (``bool``, ``int``, ...). That makes expressions
    like ``Model.archived == False`` resolve to ``bool`` statically, even
    though the runtime value is a ``QueryNode``.

    ``col()`` is runtime-identity for ``FieldProxy`` inputs and statically
    narrows the return type to ``FieldProxy[T]``, so ``col(Model.archived) ==
    False`` type-checks as ``QueryNode``. Use it when a single attribute
    trips your type checker; for new code, prefer the lambda predicate API
    on :meth:`Query.where`.

    Args:
        value: A model class attribute (already a ``FieldProxy`` at runtime).

    Returns:
        The same object, statically typed as ``FieldProxy[T]``.

    Raises:
        TypeError: If ``value`` is not a ``FieldProxy``. This guards against
            calling ``col()`` on a literal (e.g., ``col(False)``), which is
            almost certainly a bug.

    Examples:
        >>> rows = await User.where(col(User.archived) == False).all()  # noqa: E712
    """
    if not isinstance(value, FieldProxy):
        raise TypeError(
            f"col() expects a model column reference (FieldProxy), got {type(value).__name__}"
        )
    return FieldProxy(value.column, predicate_style="col")  # type: ignore[return-value]

QueryProxy

Bases: Generic[TModel]

Lazy attribute proxy used by lambda predicates passed to Query.where.

A fresh QueryProxy is constructed each time a lambda predicate is evaluated. Any attribute access returns a :class:FieldProxy for the accessed name, so lambda t: t.archived == False builds a :class:QueryNode without ever asking the model class what type archived is. The TModel type parameter exists so user-supplied lambdas can narrow t to a specific model in static analysis; the proxy itself ignores the parameter at runtime.

The proxy attribute return type is intentionally FieldProxy[Any] for now — wiring per-field types through a lambda parameter requires @dataclass_transform plumbing on the metaclass, which is outside this feature's scope.

Examples:

>>> rows = await User.where(lambda t: t.archived == False).all()  # noqa: E712
Source code in src/ferro/query/nodes.py
class QueryProxy(Generic[TModel]):
    """Lazy attribute proxy used by lambda predicates passed to ``Query.where``.

    A fresh ``QueryProxy`` is constructed each time a lambda predicate is
    evaluated. Any attribute access returns a :class:`FieldProxy` for the
    accessed name, so ``lambda t: t.archived == False`` builds a
    :class:`QueryNode` without ever asking the model class what type
    ``archived`` is. The ``TModel`` type parameter exists so user-supplied
    lambdas can narrow ``t`` to a specific model in static analysis; the
    proxy itself ignores the parameter at runtime.

    The proxy attribute return type is intentionally ``FieldProxy[Any]`` for
    now — wiring per-field types through a lambda parameter requires
    ``@dataclass_transform`` plumbing on the metaclass, which is outside this
    feature's scope.

    Examples:
        >>> rows = await User.where(lambda t: t.archived == False).all()  # noqa: E712
    """

    __slots__ = ()

    def __getattr__(self, name: str) -> "FieldProxy[Any]":
        """Return a fresh ``FieldProxy`` for any attribute name."""
        return FieldProxy(name, predicate_style="lambda")

Attributes

__slots__ = () class-attribute instance-attribute

Functions

__getattr__(name)

Return a fresh FieldProxy for any attribute name.

Source code in src/ferro/query/nodes.py
def __getattr__(self, name: str) -> "FieldProxy[Any]":
    """Return a fresh ``FieldProxy`` for any attribute name."""
    return FieldProxy(name, predicate_style="lambda")

Predicate = Callable[[QueryProxy[TModel]], QueryNode] module-attribute

Type alias for lambda predicates accepted by :meth:Query.where.