Skip to content

Quickstart

In about 10 minutes you'll build a small blog backend: two related models (Author and Post), an in-memory SQLite database, and every core Ferro operation — create, query, traverse relationships, update, delete, and transactions.

Every code block on this page comes from one runnable script, shown in full at the bottom of the page. Follow along in a file of your own, or just run the script.

Define Your Models

Ferro supports two equivalent field-declaration styles — options on the assignment side, or inside typing.Annotated. Every model example in these docs shows both; pick one and stay consistent in your project.

from datetime import datetime
from typing import Annotated

from ferro import BackRef, Field, ForeignKey, Model, Relation, connect, transaction


class Author(Model):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    email: str = Field(unique=True)
    posts: Relation[list["Post"]] = BackRef()


class Post(Model):
    id: int | None = Field(default=None, primary_key=True)
    title: str
    body: str
    published: bool = False
    created_at: datetime = Field(default_factory=datetime.now)
    author: Annotated[Author, ForeignKey(related_name="posts")]
from datetime import datetime
from typing import Annotated

from ferro import BackRef, Field, ForeignKey, Model, Relation, connect


class Author(Model):
    id: Annotated[int | None, Field(default=None, primary_key=True)]
    name: str
    email: Annotated[str, Field(unique=True)]
    posts: Relation[list["Post"]] = BackRef()


class Post(Model):
    id: Annotated[int | None, Field(default=None, primary_key=True)]
    title: str
    body: str
    published: bool = False
    created_at: Annotated[datetime, Field(default_factory=datetime.now)]
    author: Annotated[Author, ForeignKey(related_name="posts")]

A Ferro model is a Pydantic model — annotated fields become columns, and defaults work exactly as in Pydantic:

  • Field(default=None, primary_key=True) marks id as the primary key. It's int | None because the database assigns it on insert.
  • Field(unique=True) adds a unique constraint; default_factory=datetime.now gives each post a creation timestamp.
  • Annotated[Author, ForeignKey(related_name="posts")] declares the many-to-one side: each Post stores an author_id column pointing at an Author.
  • Relation[list["Post"]] = BackRef() is the reverse side: author.posts becomes a chainable query for that author's posts. related_name="posts" is what links the two.

Connect

    await connect("sqlite::memory:", auto_migrate=True)

connect() takes a database URL. sqlite::memory: gives you a throwaway in-memory database — perfect for this tutorial and for tests. For a file-backed database use sqlite:app.db?mode=rwc (rwc = read/write/create), or a postgres://... URL for PostgreSQL.

auto_migrate=True creates tables for every registered model on connect. It's great for development; for production schemas, use Alembic migrations.

Create Data

    alice = await Author.create(name="Alice", email="alice@example.com")

    post = await Post.create(
        title="Why Ferro is Fast",
        body="Ferro hands SQL generation and row hydration to a Rust engine...",
        published=True,
        author=alice,
    )

    # Insert many rows in a single statement
    await Post.bulk_create(
        [
            Post(title="Async Patterns", body="...", published=True, author_id=alice.id),
            Post(title="Unfinished Draft", body="...", author_id=alice.id),
        ]
    )
  • Model.create(...) validates the data, inserts one row, and returns the instance with its database-assigned id populated. Notice you can pass a model instance (author=alice) for the foreign key.
  • Model.bulk_create([...]) inserts many rows in a single statement — use it whenever you're loading more than a handful of rows. Here we set author_id directly instead of passing the instance.

Query

    # Fetch by primary key
    same_post = await Post.get(post.id)

    # Filter, order, and slice
    published = (
        await Post.where(lambda t: t.published == True)  # noqa: E712
        .order_by(Post.created_at, "desc")
        .limit(10)
        .all()
    )

    # Aggregate terminals
    total = await Post.select().count()
    has_drafts = await Post.where(lambda t: t.published == False).exists()  # noqa: E712
  • Post.get(pk) fetches one row by primary key.
  • where(...) filters, order_by(...) sorts, limit(...) slices — and nothing touches the database until a terminal like .all(), .first(), .count(), or .exists() runs the query.
  • lambda t: t.published == True is a lambda predicate — the officially recommended query style. Two other styles exist for compatibility; see Queries for the comparison.

What happened

Thanks to Ferro's identity map, Post.get(post.id) returns the same Python object as the post you created earlier — not a duplicate copy. One row, one instance.

Work with Relationships

    # Forward access: awaiting the foreign key loads the related instance
    author = await same_post.author

    # Reverse access: the BackRef is a chainable query
    alice_posts = await author.posts.where(lambda t: t.published == True).all()  # noqa: E712

Two directions, two idioms:

  • Forward (post.author): awaiting the foreign key attribute loads the related Author.
  • Reverse (author.posts): the BackRef is a query, so you can chain .where(), .order_by(), and friends before awaiting it.

Update & Delete

    # Update one instance
    post.title = "Why Ferro is *Really* Fast"
    await post.save()

    # Update many rows at once
    updated = await Post.where(lambda t: t.published == False).update(published=True)  # noqa: E712

    # Delete
    deleted = await Post.where(lambda t: t.title == "Unfinished Draft").delete()
  • For a single instance: mutate attributes, then await post.save().
  • For many rows: chain .update(field=value) or .delete() onto a where() query. Both return the number of affected rows.

Wrap It in a Transaction

    async with transaction():
        bob = await Author.create(name="Bob", email="bob@example.com")
        await Post.create(title="Hello", body="...", author=bob)
    # Commits on success, rolls back if the block raises

Everything inside async with transaction(): commits together when the block exits cleanly — and rolls back entirely if it raises. Use it whenever multiple writes must succeed or fail as one. More in Transactions.

Complete Script

The whole tutorial as one runnable file — it lives in the repo at docs/examples/quickstart.py:

"""Runnable companion to the Quickstart tutorial (docs/pages/getting-started/quickstart.md)."""

import asyncio

from datetime import datetime
from typing import Annotated

from ferro import BackRef, Field, ForeignKey, Model, Relation, connect, transaction


class Author(Model):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    email: str = Field(unique=True)
    posts: Relation[list["Post"]] = BackRef()


class Post(Model):
    id: int | None = Field(default=None, primary_key=True)
    title: str
    body: str
    published: bool = False
    created_at: datetime = Field(default_factory=datetime.now)
    author: Annotated[Author, ForeignKey(related_name="posts")]


async def main() -> None:
    await connect("sqlite::memory:", auto_migrate=True)

    alice = await Author.create(name="Alice", email="alice@example.com")

    post = await Post.create(
        title="Why Ferro is Fast",
        body="Ferro hands SQL generation and row hydration to a Rust engine...",
        published=True,
        author=alice,
    )

    # Insert many rows in a single statement
    await Post.bulk_create(
        [
            Post(title="Async Patterns", body="...", published=True, author_id=alice.id),
            Post(title="Unfinished Draft", body="...", author_id=alice.id),
        ]
    )
    assert post.id is not None

    # Fetch by primary key
    same_post = await Post.get(post.id)

    # Filter, order, and slice
    published = (
        await Post.where(lambda t: t.published == True)  # noqa: E712
        .order_by(Post.created_at, "desc")
        .limit(10)
        .all()
    )

    # Aggregate terminals
    total = await Post.select().count()
    has_drafts = await Post.where(lambda t: t.published == False).exists()  # noqa: E712
    assert same_post is post  # identity map: same Python object
    assert len(published) == 2
    assert total == 3
    assert has_drafts

    # Forward access: awaiting the foreign key loads the related instance
    author = await same_post.author

    # Reverse access: the BackRef is a chainable query
    alice_posts = await author.posts.where(lambda t: t.published == True).all()  # noqa: E712
    assert author.name == "Alice"
    assert len(alice_posts) == 2

    # Update one instance
    post.title = "Why Ferro is *Really* Fast"
    await post.save()

    # Update many rows at once
    updated = await Post.where(lambda t: t.published == False).update(published=True)  # noqa: E712

    # Delete
    deleted = await Post.where(lambda t: t.title == "Unfinished Draft").delete()
    assert updated == 1
    assert deleted == 1

    async with transaction():
        bob = await Author.create(name="Bob", email="bob@example.com")
        await Post.create(title="Hello", body="...", author=bob)
    # Commits on success, rolls back if the block raises
    assert await Author.select().count() == 2

    print("quickstart example ran successfully")


if __name__ == "__main__":
    asyncio.run(main())

What's Next

  • Next Steps — pick a path based on what you're building
  • Models & Fields — every field type and constraint
  • Queries — the lambda predicate style in depth, ordering, slicing, terminals
  • Relationships — foreign keys, back-references, many-to-many