Gateway

API reference for @lakesync/gateway — SyncGateway, configuration, push/pull, flush, flush queue, sources, actions, and monitoring.

The @lakesync/gateway package provides SyncGateway — the server-side engine that coordinates delta ingestion, conflict resolution, buffering, flush, source adapters, and action dispatch.

SyncGateway

Constructor

class SyncGateway {
  constructor(config: GatewayConfig, adapter?: LakeAdapter | DatabaseAdapter);
}

The adapter parameter is a convenience shorthand for config.adapter.

Note: LakeAdapter and DatabaseAdapter are imported from @lakesync/core (not @lakesync/adapter). The adapter package provides concrete implementations (S3Adapter, PostgresAdapter, etc.), while the interfaces live in core.

GatewayConfig

interface GatewayConfig {
  gatewayId: string;
  maxBufferBytes: number;
  maxBufferAgeMs: number;
  flushFormat?: "json" | "parquet";
  tableSchema?: TableSchema;
  catalogue?: NessieCatalogueClient;
  schemaManager?: SchemaManager;
  adapter?: LakeAdapter | DatabaseAdapter;
  sourceAdapters?: Record<string, DatabaseAdapter>;
  adaptiveBufferConfig?: {
    wideColumnThreshold: number;
    reductionFactor: number;
  };
  maxBackpressureBytes?: number;
  perTableBudgetBytes?: number;
  actionHandlers?: Record<string, ActionHandler>;
  schemas?: ReadonlyArray<TableSchema>;
  materialisers?: ReadonlyArray<Materialisable>;
  flushQueue?: FlushQueue;
  onMaterialisationFailure?: (table: string, deltaCount: number, error: Error) => void;
}
FieldDefaultDescription
gatewayIdrequiredUnique identifier for this gateway instance
maxBufferBytesrequiredBuffer byte threshold for flush
maxBufferAgeMsrequiredBuffer age threshold for flush (ms)
flushFormat"parquet"Output format for flush ("json" or "parquet")
tableSchemaRequired for Parquet flush
adapterStorage adapter for flush (S3/R2, Postgres, MySQL, BigQuery)
sourceAdapters{}Named database adapters for adapter-sourced pull
maxBackpressureBytes2 × maxBufferBytesReject pushes above this threshold
perTableBudgetBytesPer-table budget for auto-flush
actionHandlersNamed action handlers for imperative actions
schemasTable schemas for materialisation after flush (see Materialise Protocol)
materialisersAdditional Materialisable targets invoked after flush (non-fatal)
flushQueueautoFlush queue for post-flush materialisation. When absent, a MemoryFlushQueue is created from the adapter and materialisers.
onMaterialisationFailureCallback invoked per-table when materialisation fails. Useful for metrics/alerting.

Example:

import { SyncGateway } from "@lakesync/gateway";

const gateway = new SyncGateway({
  gatewayId: "my-gw",
  maxBufferBytes: 4 * 1024 * 1024,
  maxBufferAgeMs: 30_000,
  adapter: s3Adapter,
  sourceAdapters: { postgres: pgAdapter },
  actionHandlers: { slack: slackHandler },
});

Push / Pull

handlePush(msg)

handlePush(
  msg: SyncPush,
): Result<HandlePushResult, ClockDriftError | SchemaError | BackpressureError>;

Validates HLC drift, resolves conflicts via column-level LWW, and appends deltas to the buffer.

  • Idempotent — re-pushing the same deltaId counts as accepted but is not re-ingested.
  • Backpressure — returns BackpressureError when buffer exceeds maxBackpressureBytes.
  • Schema validation — validates against SchemaManager if configured.

handlePull(msg, context?)

// Buffer pull (synchronous)
handlePull(msg: SyncPull, context?: SyncRulesContext): Result<SyncResponse, never>;

// Adapter-sourced pull (async)
handlePull(
  msg: SyncPull & { source: string },
  context?: SyncRulesContext,
): Promise<Result<SyncResponse, AdapterNotFoundError | AdapterError>>;

Two paths:

  1. Buffer pull — returns deltas from the in-memory log since sinceHlc. When a SyncRulesContext is provided, deltas are post-filtered by the client's bucket definitions and JWT claims. Over-fetches 3× the limit and retries up to 5 times to fill the page after filtering.

  2. Adapter-sourced pull — when msg.source is set, queries the named DatabaseAdapter via queryDeltasSince() instead of the buffer. Sync rules filtering applies if context is provided.

Flush

flush()

flush(): Promise<Result<void, FlushError>>;

Drain the entire buffer and write to the configured adapter.

  • Parquet (default) — writes a Parquet file via writeDeltasToParquet.
  • JSON — writes a FlushEnvelope as JSON.
  • Database adapter — calls insertDeltas() directly.
  • On failure, entries are restored to the buffer for retry.
  • Returns FlushError if no adapter is configured or if a flush is already in progress.

flushTable(table)

flushTable(table: string): Promise<Result<void, FlushError>>;

Flush a single table's deltas, leaving other tables in the buffer. Same write logic as flush().

shouldFlush()

shouldFlush(): boolean;

Check if the buffer should be flushed based on maxBufferBytes and maxBufferAgeMs thresholds. Respects adaptiveBufferConfig for wide-column deltas.

getTablesExceedingBudget()

getTablesExceedingBudget(): string[];

Returns table names whose buffer size exceeds perTableBudgetBytes. Returns empty array when no budget is configured.

rehydrate(deltas)

rehydrate(deltas: ReadonlyArray<RowDelta>): void;

Restore persisted deltas into the buffer without push validation (no HLC drift check, no conflict resolution, no schema validation). Used by GatewayServer to rehydrate from SQLite persistence on startup.

Source Adapters

Named database adapters for adapter-sourced pulls. Clients pull from a source by passing source: "name" in the pull request.

registerSource(name, adapter)

registerSource(name: string, adapter: DatabaseAdapter): void;

unregisterSource(name)

unregisterSource(name: string): void;

listSources()

listSources(): string[];

Example:

import { PostgresAdapter } from "@lakesync/adapter";

const pg = new PostgresAdapter({ connectionString: "..." });
gateway.registerSource("analytics", pg);

// Client can now pull from this source
coordinator.pullFrom("analytics");

Actions

Execute imperative operations against external systems via registered ActionHandlers. See Actions for the full guide.

handleAction(msg, context?)

handleAction(
  msg: ActionPush,
  context?: AuthContext,
): Promise<Result<ActionResponse, ActionValidationError>>;

Dispatches each action to the matching handler by connector name. Supports:

  • actionId deduplication — re-submitting the same ID returns the cached result.
  • idempotencyKey deduplication — different actions with the same key return the first result.
  • Retryable errors are not cached — the client can retry with the same actionId.
  • Auth context is forwarded to executeAction() for fine-grained permissions.

registerActionHandler(name, handler)

registerActionHandler(name: string, handler: ActionHandler): void;

unregisterActionHandler(name)

unregisterActionHandler(name: string): void;

listActionHandlers()

listActionHandlers(): string[];

describeActions()

describeActions(): ActionDiscovery;

Returns a map of connector name → supported ActionDescriptor[]. Used by the discovery endpoint (GET /sync/:id/actions) and client's describeActions().

Example:

gateway.registerActionHandler("github", {
  supportedActions: [
    { actionType: "create_pr", description: "Create a pull request" },
  ],
  async executeAction(action) {
    // Call GitHub API...
    return Ok({ actionId: action.actionId, data: { url: "..." }, serverHlc: 0n as HLCTimestamp });
  },
});

gateway.describeActions();
// { connectors: { github: [{ actionType: "create_pr", description: "..." }] } }

Flush Queue

The flush queue decouples flush from materialisation. After a successful flush, deltas are published to the queue for downstream processing. The gateway creates a MemoryFlushQueue by default (inline materialisation), but you can provide an R2FlushQueue or a custom implementation for async processing.

FlushQueue

interface FlushQueue {
  publish(
    entries: ReadonlyArray<RowDelta>,
    context: FlushContext,
  ): Promise<Result<void, FlushQueueError>>;
}

FlushContext

interface FlushContext {
  gatewayId: string;
  schemas: ReadonlyArray<TableSchema>;
}

isFlushQueue(value)

function isFlushQueue(value: unknown): value is FlushQueue;

Duck-typed type guard (same pattern as isMaterialisable).

MemoryFlushQueue

class MemoryFlushQueue implements FlushQueue {
  constructor(
    materialisers: ReadonlyArray<Materialisable>,
    onFailure?: (table: string, deltaCount: number, error: Error) => void,
  );
}

Calls processMaterialisation() inline. This is the default when no flushQueue is provided in GatewayConfig.

R2FlushQueue

class R2FlushQueue implements FlushQueue {
  constructor(adapter: LakeAdapter);
}

Writes serialised delta batches to object storage under materialise-jobs/{gatewayId}/{timestamp}-{rand}.json. A separate polling consumer lists this prefix, processes each batch, then deletes the object. No queue service required.

processMaterialisation(entries, schemas, config)

function processMaterialisation(
  entries: ReadonlyArray<RowDelta>,
  schemas: ReadonlyArray<TableSchema>,
  config: MaterialisationProcessorConfig,
): Promise<void>;

Runs materialisation targets against flushed deltas. Iterates targets, catches failures per-table, calls onFailure, never throws.

MaterialisationProcessorConfig

interface MaterialisationProcessorConfig {
  materialisers: ReadonlyArray<Materialisable>;
  onFailure?: (table: string, deltaCount: number, error: Error) => void;
}

collectMaterialisers(adapter, extra?)

function collectMaterialisers(
  adapter: unknown,
  extra?: ReadonlyArray<Materialisable>,
): Materialisable[];

Builds the full list of materialisation targets from an adapter (if it implements Materialisable) and any explicit extra materialisers.

FlushCoordinatorDeps

interface FlushCoordinatorDeps {
  config: FlushConfig;
  schemas?: ReadonlyArray<TableSchema>;
  flushQueue?: FlushQueue;
}

Example — inline materialisation (default):

import { SyncGateway } from "@lakesync/gateway";

const gateway = new SyncGateway({
  gatewayId: "my-gw",
  maxBufferBytes: 4 * 1024 * 1024,
  maxBufferAgeMs: 30_000,
  adapter: pgAdapter,          // Materialisable — auto-creates MemoryFlushQueue
  schemas: [{ table: "orders", columns: [{ name: "total", type: "number" }] }],
  onMaterialisationFailure: (table, count, err) => {
    console.error(`Materialise failed: ${table} (${count} deltas)`, err);
  },
});

Example — async via R2:

import { SyncGateway, R2FlushQueue } from "@lakesync/gateway";

const gateway = new SyncGateway({
  gatewayId: "my-gw",
  maxBufferBytes: 4 * 1024 * 1024,
  maxBufferAgeMs: 30_000,
  adapter: s3Adapter,
  flushQueue: new R2FlushQueue(s3Adapter),   // overrides default MemoryFlushQueue
  schemas: [{ table: "orders", columns: [{ name: "total", type: "number" }] }],
});

Monitoring

bufferStats

get bufferStats(): { logSize: number; indexSize: number; byteSize: number };
  • logSize — total number of deltas in the buffer.
  • indexSize — number of unique row keys tracked.
  • byteSize — estimated byte size of buffered deltas.

tableStats

get tableStats(): Array<{ table: string; byteSize: number; deltaCount: number }>;

Per-table breakdown of buffer contents.

Types

HandlePushResult

interface HandlePushResult {
  serverHlc: HLCTimestamp;
  accepted: number;
  deltas: RowDelta[];
}

FlushEnvelope

interface FlushEnvelope {
  version: 1;
  gatewayId: string;
  createdAt: string;
  hlcRange: { min: HLCTimestamp; max: HLCTimestamp };
  deltaCount: number;
  byteSize: number;
  deltas: RowDelta[];
}

GatewayState

interface GatewayState {
  hlc: HLCTimestamp;
  flushing: boolean;
}

Gateway Server

The @lakesync/gateway-server package wraps SyncGateway in a standalone HTTP server for self-hosted deployments. See Architecture for deployment options.

Routes

MethodPathAuthDescription
POST/sync/:id/pushJWTPush deltas
POST/sync/:id/pullJWTPull deltas
POST/sync/:id/actionJWTExecute actions
GET/sync/:id/actionsJWTDiscover available actions
GET/sync/:id/checkpointJWTDownload checkpoint
POST/admin/flush/:idJWTTrigger flush
POST/admin/schema/:idJWTUpdate schema
POST/admin/sync-rules/:idJWTUpdate sync rules
POST/admin/connectors/:idJWT (admin)Register a connector
GET/admin/connectors/:idJWT (admin)List connectors
DELETE/admin/connectors/:id/:nameJWT (admin)Unregister a connector
GET/sync/:id/metricsJWTBuffer statistics
GET/healthNoneHealth check

Configuration

import { GatewayServer } from "@lakesync/gateway-server";

const server = new GatewayServer({
  gatewayId: "my-gw",
  port: 8080,
  adapter: s3Adapter,                    // optional storage adapter
  maxBufferBytes: 4 * 1024 * 1024,       // default 4 MiB
  maxBufferAgeMs: 30_000,                // default 30s
  jwtSecret: process.env.JWT_SECRET,     // optional — disables auth when absent
  allowedOrigins: ["http://localhost:5173"], // optional CORS origins
  persistence: "sqlite",                 // "memory" (default) or "sqlite"
  flushIntervalMs: 30_000,              // periodic flush interval (default 30s)
  ingestSources: [],                     // optional polling ingest sources
});

await server.start();

Gateway Worker (Cloudflare)

The @lakesync/gateway-worker package deploys SyncGateway as a Cloudflare Worker with Durable Objects.

Routes

Same as gateway-server above. All routes except /health require JWT authentication.

Table Sharding

Optional table-level sharding across multiple Durable Objects:

{
  "shards": {
    "orders": "shard-orders",
    "products": "shard-products"
  },
  "default": "shard-default"
}

Set via SHARD_CONFIG environment variable (JSON string). When absent, sharding is disabled.

  • Push — partitions deltas by table and fans out to the correct shard DOs.
  • Pull — fans out to all shards, merges results sorted by HLC.
  • Admin ops — applied to all shards.