• repo
  • readme
  • stackblitz

event-sourced-collection

View on Github
.

event-sourced-collection
.
B

Languages

  • .
  • TypeScript100.0%

event-sourced-collection readme

@eventsourced/db

Event-sourced local-first database built on TanStack DB persistence. Every mutation (insert, update, delete) is automatically captured as an event in a global SQLite log. Events sync to your backend when online — enabling offline-first, partial sync without full-table fetches.

Why Build on TanStack DB?

Building an event-sourced local-first database from scratch means solving:

Problem Raw Implementation TanStack DB Gives You
Local persistence Write your own SQLite schema, migrations, WAL, transactions persistedCollectionOptions — battle-tested SQLite layer across 8 platforms
Reactive UI updates Manual subscriptions, diffing, re-renders useLiveQuery — incremental differential dataflow engine
Multi-tab coordination BroadcastChannel, Web Locks, leader election BrowserCollectionCoordinator — built-in
Optimistic mutations Rollback queues, conflict resolution UI collection.insert/update/delete with automatic optimistic state
Offline transactions Outbox pattern, retry, idempotency, connectivity detection @tanstack/offline-transactions — persistent outbox with backoff
Query engine Manual filtering, sorting, pagination Full SQL-like query builder with indexes
Hierarchical data N+1 queries, manual tree construction includes — nested subqueries in one graph
Sync state visibility Manual flags, separate state stores Virtual props ($synced, $origin) — queryable directly
Reactive side effects Polling, manual diffing, stale closures createEffect — incremental triggers on query result deltas
Cross-platform Separate implementations per platform One model: browser, RN, Expo, Node, Electron, Tauri, Capacitor, Cloudflare DO

The bottom line: TanStack DB already solved the hard infrastructure problems (persistence, reactivity, multi-tab, optimistic updates, cross-platform SQLite). We only build the thin event-sourcing layer on top — the event log table, the sync protocol, and the hook wiring. That's ~300 lines of actual logic vs ~10,000+ lines to raw-dog the same result.

Building from scratch would mean:

  • Writing your own reactive query engine (months of work)
  • Writing your own persistence layer per platform (wa-sqlite, op-sqlite, better-sqlite3…)
  • Writing your own multi-tab coordination (leader election, BroadcastChannel)
  • Writing your own optimistic mutation system with rollback
  • Testing all of the above across browsers, React Native, Expo, Electron…

Or: depend on @tanstack/db and focus entirely on your domain logic.


TanStack DB 0.6 Features We Leverage

This library is designed around TanStack DB 0.6 capabilities:

persistedCollectionOptions — Durable State Collections

Each state collection (users, todos, etc.) is a normal persisted collection. Data survives restarts, works offline, hydrates instantly from SQLite.

onInsert / onUpdate / onDelete Hooks — Event Capture

We inject hooks into each collection's mutation handlers. Every time you call collection.insert(), the hook logs an event to the SQLite event table. Zero user awareness required.

acceptMutations — Server Event Replay

When pulling events from the server, we replay them into state collections via collection.utils.acceptMutations(). This is TanStack DB's internal API for applying external state changes without triggering mutation hooks (avoiding re-logging).

createEffect — Auto-Sync on Pending Events

Instead of manual db.sync() calls, you can wire createEffect to auto-sync whenever pending events exist:

import { createEffect, eq } from '@tanstack/db'

createEffect({
  query: (q) =>
    q.from({ event: db.eventLog })
     .where(({ event }) => eq(event.syncStatus, 'pending')),
  skipInitial: false,
  onEnter: async () => {
    if (navigator.onLine) await db.sync()
  },
})

Virtual Props ($synced) — Sync Status in Queries

Use $synced to build outbox-style views showing which data is confirmed vs pending:

const { data: pendingTodos } = useLiveQuery((q) =>
  q.from({ todo: db.collections.todos })
   .where(({ todo }) => eq(todo.$synced, false))
)

@tanstack/offline-transactions — Retry & Connectivity

For production apps, pair with @tanstack/offline-transactions for:

  • Automatic retry with exponential backoff
  • Connectivity detection (online/offline)
  • Leader election (only one tab syncs)
  • Persistent outbox that survives crashes
import { createOfflineTransaction } from '@tanstack/offline-transactions'

const offlineSync = createOfflineTransaction({
  mutationFn: async () => {
    await db.sync()
  },
  retryConfig: { maxRetries: 5, backoffMs: 1000 },
})

includes — Hierarchical Queries Over State Collections

Query across your event-sourced collections with nested includes:

const { data: projects } = useLiveQuery((q) =>
  q.from({ p: db.collections.projects }).select(({ p }) => ({
    id: p.id,
    name: p.name,
    todos: q
      .from({ t: db.collections.todos })
      .where(({ t }) => eq(t.projectId, p.id))
      .select(({ t }) => ({ id: t.id, title: t.title, status: t.status })),
  }))
)

Install

npm install @eventsourced/db @tanstack/db @tanstack/db-sqlite-persistence-core

Plus your platform package:

# Browser (wa-sqlite + OPFS)
npm install @tanstack/browser-db-sqlite-persistence

# React Native (op-sqlite)
npm install @tanstack/react-native-db-sqlite-persistence

# Expo
npm install @tanstack/expo-db-sqlite-persistence

Usage

1. Define Your Data Types

type User = {
  id: string
  name: string
  email: string
  createdAt: number
}

type Todo = {
  id: string
  userId: string
  title: string
  status: 'pending' | 'complete'
  createdAt: number
  updatedAt: number
}

type AppSettings = {
  id: string
  theme: 'light' | 'dark'
  language: string
}

2. Setup the Database (Browser)

import { createCollection } from '@tanstack/react-db'
import {
  BrowserCollectionCoordinator,
  createBrowserWASQLitePersistence,
  openBrowserWASQLiteOPFSDatabase,
  persistedCollectionOptions,
} from '@tanstack/browser-db-sqlite-persistence'
import { createEventSourcedDB, createBrowserPlatform } from '@eventsourced/db'

const platform = await createBrowserPlatform(
  {
    openBrowserWASQLiteOPFSDatabase,
    createBrowserWASQLitePersistence,
    BrowserCollectionCoordinator,
  },
  { databaseName: 'my-app.sqlite' },
)

const db = await createEventSourcedDB({
  driver: platform.driver,
  persistence: platform.persistence,
  createCollection,
  persistedCollectionOptions,
  sync: {
    push: '/api/events',
    pull: '/api/events',
    headers: () => ({ Authorization: `Bearer ${getAccessToken()}` }),
  },
  collections: {
    users: { getKey: (u: User) => u.id },
    todos: { getKey: (t: Todo) => t.id },
    settings: { getKey: (s: AppSettings) => s.id },
  },
})

3. Write Data (Exactly Like Normal TanStack DB)

// Insert a user
db.collections.users.insert({
  id: crypto.randomUUID(),
  name: 'Alice',
  email: 'alice@example.com',
  createdAt: Date.now(),
})

// Insert a todo that references the user
db.collections.todos.insert({
  id: crypto.randomUUID(),
  userId: 'user-1',
  title: 'Buy groceries',
  status: 'pending',
  createdAt: Date.now(),
  updatedAt: Date.now(),
})

// Update a todo
db.collections.todos.update('todo-1', (draft) => {
  draft.status = 'complete'
  draft.updatedAt = Date.now()
})

// Delete a todo
db.collections.todos.delete('todo-1')

Every mutation is automatically logged to the event store. You don't call a special dispatch() function. Just use the collections normally.

4. Read Data (Exactly Like Normal TanStack DB)

import { useLiveQuery } from '@tanstack/react-db'

function TodoList() {
  const { data: todos = [] } = useLiveQuery((q) =>
    q.from({ todo: db.collections.todos })
     .where(({ todo }) => todo.status.eq('pending'))
     .orderBy(({ todo }) => todo.createdAt, 'desc'),
  )

  const { data: users = [] } = useLiveQuery((q) =>
    q.from({ user: db.collections.users }),
  )

  return (
    <ul>
      {todos.map((todo) => (
        <li key={todo.id}>
          {todo.title} — {users.find((u) => u.id === todo.userId)?.name}
        </li>
      ))}
    </ul>
  )
}

5. Sync When Ready

// Manual sync (e.g. on a button press or reconnect)
const result = await db.sync()
console.log(`Pushed ${result.pushed} events, pulled ${result.pulled} events`)

if (result.errors.length > 0) {
  console.error('Sync errors:', result.errors)
}
// Auto-sync on reconnect
window.addEventListener('online', () => db.sync())

// Periodic sync
setInterval(() => {
  if (navigator.onLine) db.sync()
}, 30_000)

React Native Setup

import { createCollection } from '@tanstack/react-native-db'
import {
  createReactNativeSQLitePersistence,
  persistedCollectionOptions,
} from '@tanstack/react-native-db-sqlite-persistence'
import { createEventSourcedDB, createReactNativePlatform } from '@eventsourced/db'
import { openDatabase } from 'react-native-op-sqlite'

const sqliteDb = openDatabase({ name: 'my-app.sqlite' })

const platform = createReactNativePlatform(
  { createReactNativeSQLitePersistence },
  { database: sqliteDb },
)

const db = await createEventSourcedDB({
  driver: platform.driver,
  persistence: platform.persistence,
  createCollection,
  persistedCollectionOptions,
  sync: {
    push: 'https://api.myapp.com/events',
    pull: 'https://api.myapp.com/events',
  },
  collections: {
    users: { getKey: (u: User) => u.id },
    todos: { getKey: (t: Todo) => t.id },
  },
})

Everything else (insert, update, delete, useLiveQuery, sync) works identically.


Custom Sync Transport

For non-HTTP sync (WebSocket, gRPC, direct database), provide a custom transport:

import type { SyncTransport } from '@eventsourced/db'

const websocketTransport: SyncTransport = {
  async push(events) {
    const response = await ws.send('events:push', events)
    return response.confirmed
  },
  async pull(since) {
    const response = await ws.send('events:pull', { since })
    return {
      events: response.events,
      cursor: response.cursor,
      hasMore: response.hasMore,
    }
  },
}

const db = await createEventSourcedDB({
  // ...
  sync: websocketTransport,
  // ...
})

Multi-Device Scenario

User has the app open on their phone and laptop:

hljs Phone (offline):
  insert todo "Buy milk"     → localSeq: 1, pending
  update todo status          → localSeq: 2, pending

Laptop (online):
  insert todo "Call dentist"  → localSeq: 1, pushed → globalSeq: 50

Phone comes online, syncs:
  1. Push: sends localSeq 1,2 to server → globalSeq: 51, 52
  2. Pull: gets globalSeq 50 (laptop's event) → replays into local todos collection

Both devices now have: "Buy milk", "Call dentist"

No full table download. No conflict resolution logic needed. Server orders events, clients replay.


Server Contract

Your backend implements two endpoints:

POST /api/events — Receive client events

Request body:

[
  {
    "eventId": "550e8400-e29b-41d4-a716-446655440000",
    "collectionId": "todos",
    "type": "insert",
    "key": "todo-1",
    "payload": { "id": "todo-1", "title": "Buy milk", "status": "pending", "userId": "u1", "createdAt": 1714490000, "updatedAt": 1714490000 },
    "timestamp": 1714490000
  },
  {
    "eventId": "550e8400-e29b-41d4-a716-446655440001",
    "collectionId": "todos",
    "type": "update",
    "key": "todo-1",
    "payload": { "id": "todo-1", "title": "Buy milk", "status": "complete", "userId": "u1", "createdAt": 1714490000, "updatedAt": 1714491000 },
    "timestamp": 1714491000
  }
]

Response:

{
  "confirmed": [
    { "eventId": "550e8400-e29b-41d4-a716-446655440000", "globalSeq": 100 },
    { "eventId": "550e8400-e29b-41d4-a716-446655440001", "globalSeq": 101 }
  ]
}

GET /api/events?since={globalSeq} — Send new events to client

Response:

{
  "events": [
    {
      "globalSeq": 102,
      "eventId": "660e8400-e29b-41d4-a716-446655440002",
      "collectionId": "users",
      "type": "insert",
      "key": "u2",
      "payload": { "id": "u2", "name": "Bob", "email": "bob@example.com", "createdAt": 1714492000 },
      "timestamp": 1714492000,
      "cursor": "102"
    }
  ],
  "cursor": "102",
  "hasMore": false
}

Server Implementation (Hono + PostgreSQL)

import { Hono } from 'hono'
import { z } from 'zod'
import { zValidator } from '@hono/zod-validator'
import type { Pool } from 'pg'

const EventSchema = z.object({
  eventId: z.string().uuid(),
  collectionId: z.string().min(1),
  type: z.enum(['insert', 'update', 'delete']),
  key: z.union([z.string(), z.number()]),
  payload: z.record(z.unknown()),
  timestamp: z.number().int().positive(),
})

export function eventsRouter(db: Pool) {
  const router = new Hono()

  router.post('/', zValidator('json', z.array(EventSchema).min(1).max(500)), async (c) => {
    const incoming = c.req.valid('json')
    const client = await db.connect()

    try {
      await client.query('BEGIN')

      const existingResult = await client.query(
        `SELECT event_id FROM events WHERE event_id = ANY($1)`,
        [incoming.map((e) => e.eventId)],
      )
      const existingIds = new Set(existingResult.rows.map((r) => r.event_id))

      const confirmed: Array<{ eventId: string; globalSeq: number }> = []

      for (const event of incoming) {
        if (existingIds.has(event.eventId)) continue

        const result = await client.query(
          `INSERT INTO events (event_id, collection_id, type, key, payload, client_timestamp)
           VALUES ($1, $2, $3, $4, $5, $6)
           RETURNING global_seq`,
          [event.eventId, event.collectionId, event.type, String(event.key), event.payload, event.timestamp],
        )

        confirmed.push({ eventId: event.eventId, globalSeq: result.rows[0].global_seq })
      }

      await client.query('COMMIT')
      return c.json({ confirmed })
    } catch (err) {
      await client.query('ROLLBACK')
      throw err
    } finally {
      client.release()
    }
  })

  router.get('/', async (c) => {
    const since = parseInt(c.req.query('since') ?? '0', 10)
    const limit = Math.min(parseInt(c.req.query('limit') ?? '500', 10), 1000)

    const result = await db.query(
      `SELECT global_seq, event_id, collection_id, type, key, payload, client_timestamp
       FROM events
       WHERE global_seq > $1
       ORDER BY global_seq ASC
       LIMIT $2`,
      [since, limit],
    )

    const events = result.rows.map((row) => ({
      globalSeq: row.global_seq,
      eventId: row.event_id,
      collectionId: row.collection_id,
      type: row.type,
      key: row.key,
      payload: row.payload,
      timestamp: row.client_timestamp,
      cursor: String(row.global_seq),
    }))

    const cursor = events.length > 0 ? events[events.length - 1].cursor : String(since)

    return c.json({ events, cursor, hasMore: events.length === limit })
  })

  return router
}

PostgreSQL Schema

CREATE TABLE events (
  global_seq        BIGSERIAL PRIMARY KEY,
  event_id          TEXT      NOT NULL UNIQUE,
  collection_id     TEXT      NOT NULL,
  type              TEXT      NOT NULL CHECK (type IN ('insert', 'update', 'delete')),
  key               TEXT      NOT NULL,
  payload           JSONB     NOT NULL,
  client_timestamp  BIGINT    NOT NULL,
  server_timestamp  BIGINT    NOT NULL DEFAULT (EXTRACT(EPOCH FROM now()) * 1000)::BIGINT
);

CREATE INDEX idx_events_global_seq    ON events (global_seq);
CREATE INDEX idx_events_collection    ON events (collection_id, global_seq);
CREATE INDEX idx_events_event_id      ON events (event_id);

Cleanup

// On app unmount or hot reload
db.dispose()
await platform.close() // browser only — closes SQLite + coordinator

API Reference

createEventSourcedDB(config)

Creates an event-sourced database instance.

Option Type Required Description
driver SQLiteDriver Yes SQLite driver instance
persistence PersistedCollectionPersistence Yes TanStack DB persistence config
createCollection Function Yes TanStack DB's createCollection
persistedCollectionOptions Function Yes From your platform package
collections Record<string, CollectionDef> Yes Your collection definitions
sync SyncUrlConfig | SyncTransport No Remote sync configuration
schemaVersion number No Global schema version (default: 1)

Returns: EventSourcedDB

Property Type Description
collections Object Your typed collections (insert/update/delete/query)
sync() () => Promise<SyncResult> Push pending + pull new events
dispose() () => void Cleanup resources

createBrowserPlatform(deps, config)

Helper to setup browser SQLite + coordinator.

createReactNativePlatform(deps, config)

Helper to setup React Native SQLite persistence.

SyncResult

type SyncResult = {
  pushed: number          // events sent to server
  pulled: number          // events received from server
  errors: Array<Error>    // non-fatal errors (push or pull may partially succeed)
}