Data Pipeline
Understand how data flows through Gluonic from storage to UI and back.
Overview
Gluonic maintains three separate data stores with distinct purposes:
┌─────────────────────────────────────────┐
│ 1. ObjectPool (In-Memory) │
│ Current state: optimistic + confirmed│
│ Observable: Yes (MobX) │
│ Persisted: No │
│ UI reads from: Here ✓ │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ 2. Transaction Queue (Local Storage) │
│ Pending mutations │
│ Observable: No │
│ Persisted: Yes │
│ Replayed on: App restart │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ 3. Storage Database (Drizzle adapter) │
│ Confirmed server data only │
│ Observable: No │
│ Persisted: Yes │
│ Source of truth: On restart │
└─────────────────────────────────────────┘Read Pipeline
// Component renders
const PostDetail = observer(({ postId }) => {
const post = useModel<Post>('post', postId)
return <h1>{post.title}</h1>
})
// Behind the scenes:
// 1. useModel → IdentityMap.getModel('post', '123')
// 2. IdentityMap checks cache
// - Cache hit → return cached instance
// - Cache miss → create new instance
// 3. instantiateModel creates:
// - new Post()
// - Wrapped in Proxy
// - Store reference attached (WeakMap)
// 4. Proxy GET trap intercepts post.title
// 5. Reads from pool: store.pool.get('post', '123').p.title
// 6. MobX tracks: "PostDetail depends on post:123.title"
// 7. Returns value to componentWrite Pipeline
// User clicks save
await store.save('post', '123', { title: 'New Title' })
// Phase 1: Optimistic Update (< 1ms)
store.pool.upsert({
t: 'post',
id: '123',
p: { ...existing, title: 'New Title' }
})
// MobX → Component re-renders ✓
// Phase 2: Queue (< 5ms)
await storage.enqueueTx({
id: 'tx-456',
type: 'post',
modelId: '123',
patch: { title: 'New Title' },
prev: { title: 'Old Title' },
op: 'u',
createdAt: Date.now()
})
// Survives app restart ✓
// Phase 3: Server Sync (background)
POST /sync/v1/tx {
ops: [{
t: 'post',
id: '123',
op: 'u',
patch: { title: 'New Title' },
clientTxId: 'tx-456'
}]
}
// Phase 4a: Success Response
{
ok: true,
results: [{ ok: true, id: '123', v: 5 }]
}
// → storage.putRow (persist confirmed data)
// → storage.dequeueTx (remove from queue)
// Pool already has the data ✓
// Phase 4b: Error Response
{
ok: true,
results: [{ ok: false, error: 'conflict' }]
}
// → pool.upsert (rollback to previous)
// → storage.dequeueTx (remove from queue)
// → UI reverts automatically ✓Sync Pipeline
Bootstrap Flow
App Start
↓
store.init()
↓
lastSyncId = await storage.getLastSyncId() // 0
↓
store.bootstrap()
↓
GET /sync/v1/bootstrap
↓
Server response:
{
"lastSid": 1000,
"rows": [
{ "t": "user", "id": "u1", "v": 1, "p": { ... } },
{ "t": "post", "id": "p1", "v": 3, "p": { ... } },
...
]
}
↓
storage.putRows(rows) // Write to SQLite
pool.upsert(rows) // Load into memory
↓
storage.setLastSyncId(1000)
↓
Bootstrap complete ✓Delta Flow
Periodic sync or manual catchUp()
↓
store.catchUp()
↓
since = await storage.getLastSyncId() // 1000
↓
GET /sync/v1/delta?since=1000
↓
Server response: SyncFrame[]
[
{ "sid": 1001, "t": "post", "id": "p1", "op": "u", "p": { "title": "Updated" } },
{ "sid": 1002, "t": "user", "id": "u2", "op": "i", "p": { ... } }
]
↓
store.applyFrames(frames)
↓
For each frame:
pool.upsert() // Update memory
storage.putRow() // Persist to database
bridge.invalidate() // Clear cache
↓
storage.setLastSyncId(1002)
↓
Delta sync complete ✓Real-Time Flow
WebSocket connects
↓
wss://api.example.com/sync/v1/ws?token=JWT
↓
Server authenticates → adds to socket pool
↓
Another client makes mutation
↓
Server writes SyncAction → broadcasts frame
↓
Redis pub/sub → all pods
↓
Pod sends frame to WebSocket clients
↓
This client receives frame
↓
store.applyFrames([frame])
↓
UI updates in real-time ✓Memory Management
ObjectPool
// Single observable map
pool.map = observable.map<string, WireRow>()
// Efficient storage
// Key: "type:id" → "post:123"
// Value: { t: 'post', id: '123', p: { ... } }
// O(1) lookups
pool.get('post', '123') // Instant
// Minimal memory
// One copy per entity (no duplicates)Identity Map (IdentityMap)
// Weak references
cache = new Map<string, Model>()
// Instances garbage collected when unused
// No memory leaksLazy References
// References don't hold data
author: LazyReference<User>
// Just tracks foreign key
// Loads on demand
// Releases when unusedNext Steps
- Data Pipeline - Complete flow
- Sync Protocol - Wire format
- Reactivity - MobX deep dive
Last updated on