Quickstart¶
In about 10 minutes you'll build a small blog backend: two related models (Author and Post), an in-memory SQLite database, and every core Ferro operation — create, query, traverse relationships, update, delete, and transactions.
Every code block on this page comes from one runnable script, shown in full at the bottom of the page. Follow along in a file of your own, or just run the script.
Define Your Models¶
Ferro supports two equivalent field-declaration styles — options on the assignment side, or inside typing.Annotated. Every model example in these docs shows both; pick one and stay consistent in your project.
from datetime import datetime
from typing import Annotated
from ferro import BackRef, Field, ForeignKey, Model, Relation, connect, transaction
class Author(Model):
id: int | None = Field(default=None, primary_key=True)
name: str
email: str = Field(unique=True)
posts: Relation[list["Post"]] = BackRef()
class Post(Model):
id: int | None = Field(default=None, primary_key=True)
title: str
body: str
published: bool = False
created_at: datetime = Field(default_factory=datetime.now)
author: Annotated[Author, ForeignKey(related_name="posts")]
from datetime import datetime
from typing import Annotated
from ferro import BackRef, Field, ForeignKey, Model, Relation, connect
class Author(Model):
id: Annotated[int | None, Field(default=None, primary_key=True)]
name: str
email: Annotated[str, Field(unique=True)]
posts: Relation[list["Post"]] = BackRef()
class Post(Model):
id: Annotated[int | None, Field(default=None, primary_key=True)]
title: str
body: str
published: bool = False
created_at: Annotated[datetime, Field(default_factory=datetime.now)]
author: Annotated[Author, ForeignKey(related_name="posts")]
A Ferro model is a Pydantic model — annotated fields become columns, and defaults work exactly as in Pydantic:
Field(default=None, primary_key=True)marksidas the primary key. It'sint | Nonebecause the database assigns it on insert.Field(unique=True)adds a unique constraint;default_factory=datetime.nowgives each post a creation timestamp.Annotated[Author, ForeignKey(related_name="posts")]declares the many-to-one side: eachPoststores anauthor_idcolumn pointing at anAuthor.Relation[list["Post"]] = BackRef()is the reverse side:author.postsbecomes a chainable query for that author's posts.related_name="posts"is what links the two.
Connect¶
connect() takes a database URL. sqlite::memory: gives you a throwaway in-memory database — perfect for this tutorial and for tests. For a file-backed database use sqlite:app.db?mode=rwc (rwc = read/write/create), or a postgres://... URL for PostgreSQL.
auto_migrate=True creates tables for every registered model on connect. It's great for development; for production schemas, use Alembic migrations.
Create Data¶
alice = await Author.create(name="Alice", email="alice@example.com")
post = await Post.create(
title="Why Ferro is Fast",
body="Ferro hands SQL generation and row hydration to a Rust engine...",
published=True,
author=alice,
)
# Insert many rows in a single statement
await Post.bulk_create(
[
Post(title="Async Patterns", body="...", published=True, author_id=alice.id),
Post(title="Unfinished Draft", body="...", author_id=alice.id),
]
)
Model.create(...)validates the data, inserts one row, and returns the instance with its database-assignedidpopulated. Notice you can pass a model instance (author=alice) for the foreign key.Model.bulk_create([...])inserts many rows in a single statement — use it whenever you're loading more than a handful of rows. Here we setauthor_iddirectly instead of passing the instance.
Query¶
# Fetch by primary key
same_post = await Post.get(post.id)
# Filter, order, and slice
published = (
await Post.where(lambda t: t.published == True) # noqa: E712
.order_by(Post.created_at, "desc")
.limit(10)
.all()
)
# Aggregate terminals
total = await Post.select().count()
has_drafts = await Post.where(lambda t: t.published == False).exists() # noqa: E712
Post.get(pk)fetches one row by primary key.where(...)filters,order_by(...)sorts,limit(...)slices — and nothing touches the database until a terminal like.all(),.first(),.count(), or.exists()runs the query.lambda t: t.published == Trueis a lambda predicate — the officially recommended query style. Two other styles exist for compatibility; see Queries for the comparison.
What happened
Thanks to Ferro's identity map, Post.get(post.id) returns the same Python object as the post you created earlier — not a duplicate copy. One row, one instance.
Work with Relationships¶
# Forward access: awaiting the foreign key loads the related instance
author = await same_post.author
# Reverse access: the BackRef is a chainable query
alice_posts = await author.posts.where(lambda t: t.published == True).all() # noqa: E712
Two directions, two idioms:
- Forward (
post.author): awaiting the foreign key attribute loads the relatedAuthor. - Reverse (
author.posts): theBackRefis a query, so you can chain.where(),.order_by(), and friends before awaiting it.
Update & Delete¶
# Update one instance
post.title = "Why Ferro is *Really* Fast"
await post.save()
# Update many rows at once
updated = await Post.where(lambda t: t.published == False).update(published=True) # noqa: E712
# Delete
deleted = await Post.where(lambda t: t.title == "Unfinished Draft").delete()
- For a single instance: mutate attributes, then
await post.save(). - For many rows: chain
.update(field=value)or.delete()onto awhere()query. Both return the number of affected rows.
Wrap It in a Transaction¶
async with transaction():
bob = await Author.create(name="Bob", email="bob@example.com")
await Post.create(title="Hello", body="...", author=bob)
# Commits on success, rolls back if the block raises
Everything inside async with transaction(): commits together when the block exits cleanly — and rolls back entirely if it raises. Use it whenever multiple writes must succeed or fail as one. More in Transactions.
Complete Script¶
The whole tutorial as one runnable file — it lives in the repo at docs/examples/quickstart.py:
"""Runnable companion to the Quickstart tutorial (docs/pages/getting-started/quickstart.md)."""
import asyncio
from datetime import datetime
from typing import Annotated
from ferro import BackRef, Field, ForeignKey, Model, Relation, connect, transaction
class Author(Model):
id: int | None = Field(default=None, primary_key=True)
name: str
email: str = Field(unique=True)
posts: Relation[list["Post"]] = BackRef()
class Post(Model):
id: int | None = Field(default=None, primary_key=True)
title: str
body: str
published: bool = False
created_at: datetime = Field(default_factory=datetime.now)
author: Annotated[Author, ForeignKey(related_name="posts")]
async def main() -> None:
await connect("sqlite::memory:", auto_migrate=True)
alice = await Author.create(name="Alice", email="alice@example.com")
post = await Post.create(
title="Why Ferro is Fast",
body="Ferro hands SQL generation and row hydration to a Rust engine...",
published=True,
author=alice,
)
# Insert many rows in a single statement
await Post.bulk_create(
[
Post(title="Async Patterns", body="...", published=True, author_id=alice.id),
Post(title="Unfinished Draft", body="...", author_id=alice.id),
]
)
assert post.id is not None
# Fetch by primary key
same_post = await Post.get(post.id)
# Filter, order, and slice
published = (
await Post.where(lambda t: t.published == True) # noqa: E712
.order_by(Post.created_at, "desc")
.limit(10)
.all()
)
# Aggregate terminals
total = await Post.select().count()
has_drafts = await Post.where(lambda t: t.published == False).exists() # noqa: E712
assert same_post is post # identity map: same Python object
assert len(published) == 2
assert total == 3
assert has_drafts
# Forward access: awaiting the foreign key loads the related instance
author = await same_post.author
# Reverse access: the BackRef is a chainable query
alice_posts = await author.posts.where(lambda t: t.published == True).all() # noqa: E712
assert author.name == "Alice"
assert len(alice_posts) == 2
# Update one instance
post.title = "Why Ferro is *Really* Fast"
await post.save()
# Update many rows at once
updated = await Post.where(lambda t: t.published == False).update(published=True) # noqa: E712
# Delete
deleted = await Post.where(lambda t: t.title == "Unfinished Draft").delete()
assert updated == 1
assert deleted == 1
async with transaction():
bob = await Author.create(name="Bob", email="bob@example.com")
await Post.create(title="Hello", body="...", author=bob)
# Commits on success, rolls back if the block raises
assert await Author.select().count() == 2
print("quickstart example ran successfully")
if __name__ == "__main__":
asyncio.run(main())
What's Next¶
- Next Steps — pick a path based on what you're building
- Models & Fields — every field type and constraint
- Queries — the lambda predicate style in depth, ordering, slicing, terminals
- Relationships — foreign keys, back-references, many-to-many