Queries¶
Ferro provides a fluent, type-safe API for constructing and executing database queries. All queries are constructed in Python and executed by the high-performance Rust engine.
Fetch by primary key¶
Model.get(pk) loads exactly one row by primary key and returns your model type (not YourModel | None). If no row exists, Ferro raises ModelDoesNotExist, a subclass of LookupError with .model and .pk set—useful for HTTP 404s or structured logging.
When a missing row is a normal outcome, use Model.get_or_none(pk), which returns YourModel | None and never raises for “not found”. The same pair exists on Model.using("name") for named connections.
from ferro import ModelDoesNotExist
user = await User.get(42)
try:
user = await User.get(client_supplied_id)
except ModelDoesNotExist:
... # e.g. return 404 from your HTTP layer
draft = await User.get_or_none(999) # None if no such row
replica_view = await User.using("replica").get_or_none(1)
Lazy forward relations (e.g. await post.author) use optional fetch internally so a broken or missing FK still resolves to None instead of raising.
Basic Filtering¶
Use standard Python comparison operators on model fields to create filter conditions:
# Equality
users = await User.where(User.is_active == True).all()
# Comparison
adults = await User.where(User.age >= 18).all()
seniors = await User.where(User.age > 65).all()
# String matching
alice_users = await User.where(User.name.like("Alice%")).all()
Supported Operators¶
| Operator | SQL Equivalent | Example |
|---|---|---|
== |
= |
User.status == "active" |
!= |
!= or <> |
User.role != "admin" |
> |
> |
User.age > 18 |
>= |
>= |
User.age >= 21 |
< |
< |
User.score < 100 |
<= |
<= |
User.score <= 50 |
.like() |
LIKE |
User.email.like("%@example.com") |
.in_() |
IN |
User.status.in_(["active", "pending"]) |
Predicate Styles¶
where() accepts three interchangeable predicate styles. They share one runtime path and one dispatcher, and you can mix them on a single chain.
from ferro.query import col
# 1. Operator (the original)
await User.where(User.id == 1).all()
# 2. col() wrapper — runtime identity, statically narrows to FieldProxy[T]
await User.where(col(User.archived) == False).all()
# 3. Lambda predicate — receives a QueryProxy with FieldProxy attributes
await User.where(lambda t: t.archived == False).all()
The operator style works at runtime for every column type, but static type checkers (Pyright, ty) flag boolean and other value-type comparisons because they see your Pydantic annotations rather than the runtime FieldProxy. Reach for col() when one attribute trips the checker, or write new code with the lambda style to sidestep the issue entirely.
When to use which
- Operator — existing code that already type-checks; quick filters where the column type isn't
bool. col()— one attribute on an existing chain trips your type checker and you want minimal diff.- Lambda — new code, especially boolean comparisons or compound predicates. Recommended idiom going forward.
See Typed Query Predicates for the full treatment, including combined-style chains and the deliberate scope boundaries.
Logical Operators¶
Combine conditions with & (AND) and | (OR). Always use parentheses around each condition:
# AND
query = User.where((User.age > 21) & (User.status == "active"))
# OR
query = User.where((User.role == "admin") | (User.role == "moderator"))
# Complex: (age > 21 AND status == 'active') OR role == 'admin'
query = User.where(
((User.age > 21) & (User.status == "active")) | (User.role == "admin")
)
# NOT with !=
inactive_users = await User.where(User.is_active != True).all()
The same compound expressions work inside lambda predicates — useful when you want the whole expression to type-check without col():
Chaining¶
Methods can be chained to build complex queries incrementally:
results = await Product.select() \
.where(Product.category == "Electronics") \
.where(Product.price < 1000) \
.order_by(Product.price, "desc") \
.limit(10) \
.offset(5) \
.all()
Multiple .where() calls are combined with AND.
Ordering¶
Sort results with .order_by():
# Single field, ascending (default)
users = await User.order_by(User.created_at).all()
# Single field, descending
users = await User.order_by(User.created_at, "desc").all()
# Multiple fields
products = await Product.order_by(Product.category) \
.order_by(Product.price, "desc") \
.all()
Limiting and Offsetting¶
Limit¶
Restrict the number of results:
# Get first 10 users
users = await User.limit(10).all()
# Get top 5 highest-scoring players
top_players = await Player.order_by(Player.score, "desc").limit(5).all()
Offset¶
Skip a number of results (useful for pagination):
# Skip first 10, get next 10
users = await User.order_by(User.id).offset(10).limit(10).all()
# Page 3 (20 per page): skip 40, take 20
page_3 = await Product.offset(40).limit(20).all()
For better pagination patterns, see How-To: Pagination.
Terminal Operations¶
These methods execute the query and return results:
.all()¶
Returns all matching records as a list:
.first()¶
Returns the first matching record or None:
admin = await User.where(User.role == "admin").first()
# Returns: User | None
if admin:
print(f"Admin: {admin.username}")
.count()¶
Returns the total number of matching records:
active_count = await User.where(User.is_active == True).count()
# Returns: int
print(f"Active users: {active_count}")
.exists()¶
Returns True if at least one matching record exists:
has_admin = await User.where(User.role == "admin").exists()
# Returns: bool
if not has_admin:
print("Warning: No admin users found!")
Performance
Use .exists() instead of .count() > 0. It's more efficient because the database can stop after finding the first match.
Aggregations¶
Currently, only .count() is implemented:
Feature Not Implemented
Aggregation functions like sum(), avg(), min(), max() are not yet available. See Coming Soon for more information.
Selecting Specific Fields¶
Feature Not Implemented
Selecting specific fields is not yet available. Ferro currently loads all model fields. See Coming Soon for more information.
Working with Relationships¶
Forward Relations¶
Access foreign keys:
Reverse Relations¶
Query the reverse side:
author = await User.where(User.username == "alice").first()
# Get all posts by author
author_posts = await author.posts.all()
# Filter reverse relation (any of the three predicate styles works)
published_posts = await author.posts.where(Post.published == True).all()
published_posts = await author.posts.where(lambda t: t.published == True).all()
# Count reverse relation
post_count = await author.posts.count()
Eager Loading¶
Feature Not Implemented
Eager loading with prefetch_related() is not yet available. See Coming Soon for current workarounds.
Advanced Filtering¶
NULL Checks¶
# Find records with NULL field
users_no_phone = await User.where(User.phone == None).all()
# Find records with non-NULL field
users_with_phone = await User.where(User.phone != None).all()
IN Queries¶
# Using .in_()
active_statuses = ["active", "pending", "verified"]
users = await User.where(User.status.in_(active_statuses)).all()
Feature Not Implemented
The not_in_() method is not yet available. See Coming Soon for workarounds using != with &.
LIKE Patterns¶
# Starts with
gmail_users = await User.where(User.email.like("%.gmail.com")).all()
# Contains
smith_users = await User.where(User.name.like("%Smith%")).all()
Feature Not Implemented
Case-insensitive ilike() is not yet available. See Coming Soon for workarounds.
Raw SQL¶
Ferro exposes execute, fetch_all, and fetch_one for raw SQL escape hatches. Raw SQL uses backend-native placeholders and can route to named connections:
from ferro import execute, fetch_all
await execute("select run_pipeline_job($1)", job_id, using="service")
rows = await fetch_all("select id, name from users where org_id = $1", org_id)
Inside transaction(using="service"), raw SQL inherits the transaction connection. See Raw SQL for bind-type details and caveats.
Performance Tips¶
Use .exists() for Checks¶
# Bad (loads full count)
if await User.where(User.email == email).count() > 0:
raise ValueError("Email already exists")
# Good (stops at first match)
if await User.where(User.email == email).exists():
raise ValueError("Email already exists")
Use Indexes¶
Add indexes to frequently filtered fields:
from ferro import Field, Model
class User(Model):
email: str = Field(unique=True, index=True)
status: str = Field(index=True)
Batch Operations¶
Use bulk methods instead of loops:
# Bad (N queries)
for user in users:
user.is_active = False
await user.save()
# Good (1 query)
await User.where(User.id.in_([u.id for u in users])).update(is_active=False)
Avoid N+1 Queries¶
Be aware of N+1 query patterns:
# This causes N+1 queries (one for posts, then one per post for author)
posts = await Post.all()
for post in posts:
author = await post.author # Separate query for each post!
Feature Not Implemented
Eager loading / prefetching is not yet available. See Coming Soon for more information. Be mindful of N+1 patterns and load relationships efficiently.
SQL Injection Protection¶
All values passed to the query API are automatically parameterized by the Rust engine. User input is never concatenated into SQL strings:
# Safe - parameterized automatically
username = request.get("username") # User input
user = await User.where(User.username == username).first()
# Generates: SELECT * FROM users WHERE username = $1
# With parameter: [username]
See Also¶
- Mutations - Creating, updating, and deleting records
- Relationships - Working with foreign keys
- How-To: Pagination - Efficient pagination patterns
- Performance - Query optimization techniques