Skip to Content
LearnArchitectureData Pipeline

Data Pipeline

Understand how data flows through Gluonic from storage to UI and back.

Overview

Gluonic maintains three separate data stores with distinct purposes:

┌─────────────────────────────────────────┐ │ 1. ObjectPool (In-Memory) │ │ Current state: optimistic + confirmed│ │ Observable: Yes (MobX) │ │ Persisted: No │ │ UI reads from: Here ✓ │ └─────────────────────────────────────────┘ ┌─────────────────────────────────────────┐ │ 2. Transaction Queue (Local Storage) │ │ Pending mutations │ │ Observable: No │ │ Persisted: Yes │ │ Replayed on: App restart │ └─────────────────────────────────────────┘ ┌─────────────────────────────────────────┐ │ 3. Storage Database (Drizzle adapter) │ │ Confirmed server data only │ │ Observable: No │ │ Persisted: Yes │ │ Source of truth: On restart │ └─────────────────────────────────────────┘

Read Pipeline

// Component renders const PostDetail = observer(({ postId }) => { const post = useModel<Post>('post', postId) return <h1>{post.title}</h1> }) // Behind the scenes: // 1. useModel → IdentityMap.getModel('post', '123') // 2. IdentityMap checks cache // - Cache hit → return cached instance // - Cache miss → create new instance // 3. instantiateModel creates: // - new Post() // - Wrapped in Proxy // - Store reference attached (WeakMap) // 4. Proxy GET trap intercepts post.title // 5. Reads from pool: store.pool.get('post', '123').p.title // 6. MobX tracks: "PostDetail depends on post:123.title" // 7. Returns value to component

Write Pipeline

// User clicks save await store.save('post', '123', { title: 'New Title' }) // Phase 1: Optimistic Update (< 1ms) store.pool.upsert({ t: 'post', id: '123', p: { ...existing, title: 'New Title' } }) // MobX → Component re-renders ✓ // Phase 2: Queue (< 5ms) await storage.enqueueTx({ id: 'tx-456', type: 'post', modelId: '123', patch: { title: 'New Title' }, prev: { title: 'Old Title' }, op: 'u', createdAt: Date.now() }) // Survives app restart ✓ // Phase 3: Server Sync (background) POST /sync/v1/tx { ops: [{ t: 'post', id: '123', op: 'u', patch: { title: 'New Title' }, clientTxId: 'tx-456' }] } // Phase 4a: Success Response { ok: true, results: [{ ok: true, id: '123', v: 5 }] } // → storage.putRow (persist confirmed data) // → storage.dequeueTx (remove from queue) // Pool already has the data ✓ // Phase 4b: Error Response { ok: true, results: [{ ok: false, error: 'conflict' }] } // → pool.upsert (rollback to previous) // → storage.dequeueTx (remove from queue) // → UI reverts automatically ✓

Sync Pipeline

Bootstrap Flow

App Start store.init() lastSyncId = await storage.getLastSyncId() // 0 store.bootstrap() GET /sync/v1/bootstrap Server response: { "lastSid": 1000, "rows": [ { "t": "user", "id": "u1", "v": 1, "p": { ... } }, { "t": "post", "id": "p1", "v": 3, "p": { ... } }, ... ] } storage.putRows(rows) // Write to SQLite pool.upsert(rows) // Load into memory storage.setLastSyncId(1000) Bootstrap complete ✓

Delta Flow

Periodic sync or manual catchUp() store.catchUp() since = await storage.getLastSyncId() // 1000 GET /sync/v1/delta?since=1000 Server response: SyncFrame[] [ { "sid": 1001, "t": "post", "id": "p1", "op": "u", "p": { "title": "Updated" } }, { "sid": 1002, "t": "user", "id": "u2", "op": "i", "p": { ... } } ] store.applyFrames(frames) For each frame: pool.upsert() // Update memory storage.putRow() // Persist to database bridge.invalidate() // Clear cache storage.setLastSyncId(1002) Delta sync complete ✓

Real-Time Flow

WebSocket connects wss://api.example.com/sync/v1/ws?token=JWT Server authenticates → adds to socket pool Another client makes mutation Server writes SyncAction → broadcasts frame Redis pub/sub → all pods Pod sends frame to WebSocket clients This client receives frame store.applyFrames([frame]) UI updates in real-time ✓

Memory Management

ObjectPool

// Single observable map pool.map = observable.map<string, WireRow>() // Efficient storage // Key: "type:id" → "post:123" // Value: { t: 'post', id: '123', p: { ... } } // O(1) lookups pool.get('post', '123') // Instant // Minimal memory // One copy per entity (no duplicates)

Identity Map (IdentityMap)

// Weak references cache = new Map<string, Model>() // Instances garbage collected when unused // No memory leaks

Lazy References

// References don't hold data author: LazyReference<User> // Just tracks foreign key // Loads on demand // Releases when unused

Next Steps

Last updated on