LakeSync

Architecture

System architecture, key patterns, and design decisions.

Overview

LakeSync is a declarative data sync engine. Adapters connect any readable or writable system — the core loop follows a push/pull model where consumers push local changes and pull filtered subsets via the gateway. Data flows between any source and any destination: local SQLite, Postgres, MySQL, BigQuery, S3/Iceberg. Every adapter is both a source and a destination.

Adapters

Any data source you can read from becomes a LakeSync adapter. Adapters are both sources and destinations — enabling bidirectional sync and cross-backend flows.

AdapterInterfaceUse case
Postgres / MySQLDatabaseAdapterOperational OLTP data, familiar SQL tooling
BigQueryDatabaseAdapterAnalytics-scale queries, managed and serverless
S3 / R2 (Iceberg)LakeAdapterMassive scale on object storage, open format
Jira CloudAPI ConnectorSync issues, comments, and projects from Jira
SalesforceAPI ConnectorSync accounts, contacts, opportunities, and leads from Salesforce
Anything elseEither interfaceCloudWatch, Stripe, custom APIs — implement the interface

The DatabaseAdapter interface (insertDeltas, queryDeltasSince, getLatestState, ensureSchema) handles SQL-like sources. The LakeAdapter interface (putObject, getObject, listObjects, deleteObject) handles object storage. The CompositeAdapter routes data to multiple backends simultaneously.

The Delta as Universal Format

RowDelta is the intermediate representation that makes any-to-any sync possible. Every source converts its native format into deltas. Every destination consumes deltas. This means N sources + M destinations = N + M adapters, not N × M connectors.

Adding a new source (e.g. CloudWatch, Stripe) automatically enables sync to every existing destination — no pairwise integration needed. The same applies in reverse: a new destination adapter immediately works with all existing sources.

Hybrid Logical Clocks (HLC)

Every mutation is timestamped with an HLCTimestamp — a branded bigint encoding 48 bits of wall-clock time and 16 bits of a monotonic counter. This provides:

  • Causal ordering across clients without centralised coordination
  • Deterministic tiebreaking — when timestamps are equal, the higher clientId wins
  • Compact representation — a single 64-bit value instead of a timestamp + counter pair
import { HLC, type HLCTimestamp } from "lakesync";

const hlc = new HLC();
const ts: HLCTimestamp = hlc.now();

Delta Model

A RowDelta represents a single mutation to a single row. Rather than storing full row snapshots, LakeSync stores column-level changes:

interface RowDelta {
  op: "INSERT" | "UPDATE" | "DELETE";
  deltaId: string;       // SHA-256 of stable-stringified payload
  table: string;
  rowId: string;
  clientId: string;
  columns: ColumnDelta[]; // Array of { column, value } pairs
  hlc: HLCTimestamp;
}

interface ColumnDelta {
  column: string;
  value: unknown;
}

Deterministic deltaId generation ensures that the same logical change always produces the same identifier, enabling idempotent processing.

Conflict Resolution

LakeSync uses column-level last-writer-wins (LWW):

  1. For each column in a delta, compare the incoming HLC with the stored HLC for that column
  2. If the incoming HLC is greater, the incoming value wins
  3. If HLCs are equal, the higher clientId wins (deterministic tiebreak)
  4. Columns not present in the incoming delta are left untouched

This means two clients can edit different columns of the same row concurrently without either change being lost.

Result Pattern

Public APIs never throw. Instead, they return Result<T, E>:

import { Ok, Err, type Result } from "lakesync";

function divide(a: number, b: number): Result<number, string> {
  if (b === 0) return Err("Division by zero");
  return Ok(a / b);
}

const result = divide(10, 2);
if (result.ok) {
  console.log(result.value); // 5
} else {
  console.error(result.error);
}

DeltaBuffer

The gateway stores deltas in a DeltaBuffer — a dual data structure with:

  • Log — an append-only array of deltas in insertion order (for pull)
  • Index — a map from table:rowId to the latest delta per column (for conflict resolution)

Adapter-Sourced Pull

The gateway can pull data from named source adapters in addition to its in-memory buffer. A consumer declares what it needs (e.g. "give me errors from the last 24h from BigQuery") and the gateway queries the adapter, applies sync rules, and returns filtered deltas.

// Client-side: pull from a named source adapter
const coordinator = new SyncCoordinator(db, transport);
await coordinator.pullFrom("bigquery");

Source adapters are registered on the gateway via sourceAdapters in GatewayConfig. The source field on SyncPull selects which adapter to query. When omitted, the existing in-memory buffer path is used (backwards compatible).

Sync Rules

Sync rules define which data each client can see. Rules are organised into buckets with filter operators:

OperatorDescriptionExample
eqEquals (exact match){ "op": "eq", "value": "jwt:sub" }
inContained in (multi-value){ "op": "in", "value": "jwt:roles" }
neqNot equals{ "op": "neq", "value": "archived" }
gtGreater than{ "op": "gt", "value": "100" }
ltLess than{ "op": "lt", "value": "50" }
gteGreater than or equal{ "op": "gte", "value": "0" }
lteLess than or equal{ "op": "lte", "value": "1000" }

Comparison operators (gt, lt, gte, lte) use numeric comparison when both values parse as numbers, falling back to string comparison via localeCompare().

{
  "buckets": [{
    "name": "user-data",
    "filters": [
      { "column": "user_id", "op": "eq", "value": "jwt:sub" }
    ],
    "tables": ["todos", "preferences"]
  }]
}

The jwt: prefix references claims from the client's JWT token. At pull time, the gateway evaluates sync rules against the client's token claims and returns only matching deltas.

Real-Time Sync

When a client pushes deltas, the gateway broadcasts them to all other connected WebSocket clients immediately — filtered by each client's sync rules. HTTP polling remains as a fallback.

The WebSocketTransport uses the same binary protobuf protocol as HTTP with tag-based framing (0x01 push, 0x02 pull, 0x03 broadcast). It reconnects automatically on disconnect with exponential backoff. When realtime is active, polling drops to a 60-second heartbeat to catch missed deltas.

See Real-Time Sync for setup and configuration details.

Source Polling Ingest

External databases (Neon, Postgres, MySQL) and APIs (Jira) may have data written by services outside LakeSync. The Source Polling Ingest system bridges that gap — it periodically queries source tables or APIs, detects changes, and pushes them through the gateway as deltas.

Two change detection strategies are available for database sources: cursor (fast, requires a monotonically increasing column like updated_at) and diff (slower, detects hard deletes via full-table comparison). API connectors like Jira and Salesforce use their own strategies internally. All pollers run inside GatewayServer — no HTTP hop, no auth needed. Detected changes flow through the same conflict resolution and sync rules as client-originated deltas.

The BaseSourcePoller includes a memory-managed ingestion pipeline. Pollers accumulate deltas and push them in configurable chunks (default 500). When the gateway implements the IngestTarget interface, the poller monitors buffer memory pressure and triggers flushes automatically. If the gateway rejects a push due to backpressure, the poller flushes and retries — preventing unbounded memory growth during large ingest operations.

See Source Polling Ingest for database polling setup and Dynamic Connectors for API connectors like Jira and Salesforce.

Materialise Protocol

After flushing deltas to a storage adapter, the gateway can optionally materialise them into queryable destination tables. All three database adapters (Postgres, MySQL, BigQuery) implement the Materialisable interface, producing denormalised, up-to-date tables from the delta log — making the data directly queryable via SQL without replaying deltas. The materialisation algorithm is generic: a shared executeMaterialise() function works with any SQL database via the pluggable SqlDialect interface.

Destination tables follow a hybrid column model:

  • Synced columns — written by the materialiser, derived from TableSchema.columns
  • props JSONB DEFAULT '{}' — consumer-extensible column, never touched by the materialiser
  • deleted_at — set on soft-delete (enabled by default), NULL for live rows
  • synced_at — updated on every materialise cycle

The isMaterialisable(adapter) type guard checks at runtime whether an adapter supports materialisation. The gateway triggers materialisation automatically after a successful flush when schemas are configured in GatewayConfig.

import { isMaterialisable } from "@lakesync/core";

if (isMaterialisable(adapter)) {
  await adapter.materialise(deltas, schemas);
}

SqlDialect Pattern

Materialisation is dialect-agnostic. The SqlDialect interface abstracts SQL differences (type mapping, parameter syntax, upsert syntax) so the core algorithm works identically across all databases:

AdapterDialectUpsert StrategyParameter Style
PostgresPostgresSqlDialectON CONFLICT DO UPDATE$N positional
MySQLMySqlDialectON DUPLICATE KEY UPDATE? positional
BigQueryBigQuerySqlDialectMERGE statement@name named

Adding materialisation to a new database adapter requires implementing four methods on SqlDialect:

interface SqlDialect {
  createDestinationTable(dest, schema, pk, softDelete): { sql; params };
  queryDeltaHistory(sourceTable, rowIds): { sql; params };
  buildUpsert(dest, schema, conflictCols, softDelete, upserts): { sql; params };
  buildDelete(dest, deleteIds, softDelete): { sql; params };
}

The adapter also needs a QueryExecutor (minimal query() + queryRows() interface) and the Materialisable.materialise() method that delegates to executeMaterialise():

class MyAdapter implements DatabaseAdapter, Materialisable {
  private readonly dialect = new MySqlDialect();

  async materialise(deltas, schemas) {
    return executeMaterialise(this.executor, this.dialect, deltas, schemas);
  }
}

Composite Primary Keys

By default, destination tables use row_id as their primary key. Set primaryKey on TableSchema to use a composite key:

const schema: TableSchema = {
  table: "order_items",
  columns: [
    { name: "order_id", type: "string" },
    { name: "product_id", type: "string" },
    { name: "quantity", type: "number" },
  ],
  primaryKey: ["order_id", "product_id"],
};

Each entry must be "row_id" or an existing column name. All three database adapters (Postgres, MySQL, BigQuery) generate the appropriate PRIMARY KEY constraint.

Soft Delete

Tombstoned rows are soft-deleted by default — the materialiser sets deleted_at to the current timestamp instead of removing the row. This preserves audit trails and enables un-delete when a row is re-inserted.

Set softDelete: false to opt out and hard-delete tombstoned rows:

const schema: TableSchema = {
  table: "events",
  columns: [{ name: "payload", type: "json" }],
  softDelete: false, // DELETE instead of UPDATE SET deleted_at
};

When soft delete is enabled (the default), upserting a previously deleted row automatically clears deleted_at back to NULL.

External ID Column

When integrating with external systems (Jira, Salesforce, etc.), rows often have an external identifier that should be used for deduplication instead of row_id. Set externalIdColumn to resolve upserts on that column:

const schema: TableSchema = {
  table: "jira_issues",
  columns: [
    { name: "jira_key", type: "string" },
    { name: "summary", type: "string" },
    { name: "status", type: "string" },
  ],
  externalIdColumn: "jira_key",
};

This adds a UNIQUE constraint on the column and uses it for ON CONFLICT targeting (Postgres/BigQuery) or ON DUPLICATE KEY resolution (MySQL). The primary key remains unchanged.

Helper Functions

Three helpers are exported from @lakesync/adapter for use in custom materialisation logic:

import { resolvePrimaryKey, resolveConflictColumns, isSoftDelete } from "@lakesync/adapter";

resolvePrimaryKey(schema);      // string[] — defaults to ["row_id"]
resolveConflictColumns(schema); // string[] — externalIdColumn or PK
isSoftDelete(schema);           // boolean — defaults to true

Checkpoints

For initial sync (when lastSyncedHlc === 0), the client downloads a checkpoint — a complete snapshot of the current state. Checkpoints are:

  • Generated post-compaction from base Parquet files
  • Encoded as Protocol Buffer chunks, one per table
  • Sized to a byte budget (default 16 MB per chunk)
  • Filtered at serve time using the client's sync rules (the stored checkpoint contains all rows)