LakeSync

Gateway

API reference for @lakesync/gateway — SyncGateway, configuration, push/pull, flush, sources, actions, and monitoring.

The @lakesync/gateway package provides SyncGateway — the server-side engine that coordinates delta ingestion, conflict resolution, buffering, flush, source adapters, and action dispatch.

SyncGateway

Constructor

class SyncGateway {
  constructor(config: GatewayConfig, adapter?: LakeAdapter | DatabaseAdapter);
}

The adapter parameter is a convenience shorthand for config.adapter.

Note: LakeAdapter and DatabaseAdapter are imported from @lakesync/core (not @lakesync/adapter). The adapter package provides concrete implementations (S3Adapter, PostgresAdapter, etc.), while the interfaces live in core.

GatewayConfig

interface GatewayConfig {
  gatewayId: string;
  maxBufferBytes: number;
  maxBufferAgeMs: number;
  flushFormat?: "json" | "parquet";
  tableSchema?: TableSchema;
  catalogue?: NessieCatalogueClient;
  schemaManager?: SchemaManager;
  adapter?: LakeAdapter | DatabaseAdapter;
  sourceAdapters?: Record<string, DatabaseAdapter>;
  adaptiveBufferConfig?: {
    wideColumnThreshold: number;
    reductionFactor: number;
  };
  maxBackpressureBytes?: number;
  perTableBudgetBytes?: number;
  actionHandlers?: Record<string, ActionHandler>;
  schemas?: ReadonlyArray<TableSchema>;
}
FieldDefaultDescription
gatewayIdrequiredUnique identifier for this gateway instance
maxBufferBytesrequiredBuffer byte threshold for flush
maxBufferAgeMsrequiredBuffer age threshold for flush (ms)
flushFormat"parquet"Output format for flush ("json" or "parquet")
tableSchemaRequired for Parquet flush
adapterStorage adapter for flush (S3/R2, Postgres, MySQL, BigQuery)
sourceAdapters{}Named database adapters for adapter-sourced pull
maxBackpressureBytes2 × maxBufferBytesReject pushes above this threshold
perTableBudgetBytesPer-table budget for auto-flush
actionHandlersNamed action handlers for imperative actions
schemasTable schemas for materialisation after flush (see Materialise Protocol)

Example:

import { SyncGateway } from "@lakesync/gateway";

const gateway = new SyncGateway({
  gatewayId: "my-gw",
  maxBufferBytes: 4 * 1024 * 1024,
  maxBufferAgeMs: 30_000,
  adapter: s3Adapter,
  sourceAdapters: { postgres: pgAdapter },
  actionHandlers: { slack: slackHandler },
});

Push / Pull

handlePush(msg)

handlePush(
  msg: SyncPush,
): Result<HandlePushResult, ClockDriftError | SchemaError | BackpressureError>;

Validates HLC drift, resolves conflicts via column-level LWW, and appends deltas to the buffer.

  • Idempotent — re-pushing the same deltaId counts as accepted but is not re-ingested.
  • Backpressure — returns BackpressureError when buffer exceeds maxBackpressureBytes.
  • Schema validation — validates against SchemaManager if configured.

handlePull(msg, context?)

// Buffer pull (synchronous)
handlePull(msg: SyncPull, context?: SyncRulesContext): Result<SyncResponse, never>;

// Adapter-sourced pull (async)
handlePull(
  msg: SyncPull & { source: string },
  context?: SyncRulesContext,
): Promise<Result<SyncResponse, AdapterNotFoundError | AdapterError>>;

Two paths:

  1. Buffer pull — returns deltas from the in-memory log since sinceHlc. When a SyncRulesContext is provided, deltas are post-filtered by the client's bucket definitions and JWT claims. Over-fetches 3× the limit and retries up to 5 times to fill the page after filtering.

  2. Adapter-sourced pull — when msg.source is set, queries the named DatabaseAdapter via queryDeltasSince() instead of the buffer. Sync rules filtering applies if context is provided.

Flush

flush()

flush(): Promise<Result<void, FlushError>>;

Drain the entire buffer and write to the configured adapter.

  • Parquet (default) — writes a Parquet file via writeDeltasToParquet.
  • JSON — writes a FlushEnvelope as JSON.
  • Database adapter — calls insertDeltas() directly.
  • On failure, entries are restored to the buffer for retry.
  • Returns FlushError if no adapter is configured or if a flush is already in progress.

flushTable(table)

flushTable(table: string): Promise<Result<void, FlushError>>;

Flush a single table's deltas, leaving other tables in the buffer. Same write logic as flush().

shouldFlush()

shouldFlush(): boolean;

Check if the buffer should be flushed based on maxBufferBytes and maxBufferAgeMs thresholds. Respects adaptiveBufferConfig for wide-column deltas.

getTablesExceedingBudget()

getTablesExceedingBudget(): string[];

Returns table names whose buffer size exceeds perTableBudgetBytes. Returns empty array when no budget is configured.

rehydrate(deltas)

rehydrate(deltas: ReadonlyArray<RowDelta>): void;

Restore persisted deltas into the buffer without push validation (no HLC drift check, no conflict resolution, no schema validation). Used by GatewayServer to rehydrate from SQLite persistence on startup.

Source Adapters

Named database adapters for adapter-sourced pulls. Clients pull from a source by passing source: "name" in the pull request.

registerSource(name, adapter)

registerSource(name: string, adapter: DatabaseAdapter): void;

unregisterSource(name)

unregisterSource(name: string): void;

listSources()

listSources(): string[];

Example:

import { PostgresAdapter } from "@lakesync/adapter";

const pg = new PostgresAdapter({ connectionString: "..." });
gateway.registerSource("analytics", pg);

// Client can now pull from this source
coordinator.pullFrom("analytics");

Actions

Execute imperative operations against external systems via registered ActionHandlers. See Actions for the full guide.

handleAction(msg, context?)

handleAction(
  msg: ActionPush,
  context?: AuthContext,
): Promise<Result<ActionResponse, ActionValidationError>>;

Dispatches each action to the matching handler by connector name. Supports:

  • actionId deduplication — re-submitting the same ID returns the cached result.
  • idempotencyKey deduplication — different actions with the same key return the first result.
  • Retryable errors are not cached — the client can retry with the same actionId.
  • Auth context is forwarded to executeAction() for fine-grained permissions.

registerActionHandler(name, handler)

registerActionHandler(name: string, handler: ActionHandler): void;

unregisterActionHandler(name)

unregisterActionHandler(name: string): void;

listActionHandlers()

listActionHandlers(): string[];

describeActions()

describeActions(): ActionDiscovery;

Returns a map of connector name → supported ActionDescriptor[]. Used by the discovery endpoint (GET /sync/:id/actions) and client's describeActions().

Example:

gateway.registerActionHandler("github", {
  supportedActions: [
    { actionType: "create_pr", description: "Create a pull request" },
  ],
  async executeAction(action) {
    // Call GitHub API...
    return Ok({ actionId: action.actionId, data: { url: "..." }, serverHlc: 0n as HLCTimestamp });
  },
});

gateway.describeActions();
// { connectors: { github: [{ actionType: "create_pr", description: "..." }] } }

Monitoring

bufferStats

get bufferStats(): { logSize: number; indexSize: number; byteSize: number };
  • logSize — total number of deltas in the buffer.
  • indexSize — number of unique row keys tracked.
  • byteSize — estimated byte size of buffered deltas.

tableStats

get tableStats(): Array<{ table: string; byteSize: number; deltaCount: number }>;

Per-table breakdown of buffer contents.

Types

HandlePushResult

interface HandlePushResult {
  serverHlc: HLCTimestamp;
  accepted: number;
  deltas: RowDelta[];
}

FlushEnvelope

interface FlushEnvelope {
  version: 1;
  gatewayId: string;
  createdAt: string;
  hlcRange: { min: HLCTimestamp; max: HLCTimestamp };
  deltaCount: number;
  byteSize: number;
  deltas: RowDelta[];
}

GatewayState

interface GatewayState {
  hlc: HLCTimestamp;
  flushing: boolean;
}

Gateway Server

The @lakesync/gateway-server package wraps SyncGateway in a standalone HTTP server for self-hosted deployments. See Architecture for deployment options.

Routes

MethodPathAuthDescription
POST/sync/:id/pushJWTPush deltas
POST/sync/:id/pullJWTPull deltas
POST/sync/:id/actionJWTExecute actions
GET/sync/:id/actionsJWTDiscover available actions
GET/sync/:id/checkpointJWTDownload checkpoint
POST/admin/flush/:idJWTTrigger flush
POST/admin/schema/:idJWTUpdate schema
POST/admin/sync-rules/:idJWTUpdate sync rules
POST/admin/connectors/:idJWT (admin)Register a connector
GET/admin/connectors/:idJWT (admin)List connectors
DELETE/admin/connectors/:id/:nameJWT (admin)Unregister a connector
GET/sync/:id/metricsJWTBuffer statistics
GET/healthNoneHealth check

Configuration

import { GatewayServer } from "@lakesync/gateway-server";

const server = new GatewayServer({
  gatewayId: "my-gw",
  port: 8080,
  adapter: s3Adapter,                    // optional storage adapter
  maxBufferBytes: 4 * 1024 * 1024,       // default 4 MiB
  maxBufferAgeMs: 30_000,                // default 30s
  jwtSecret: process.env.JWT_SECRET,     // optional — disables auth when absent
  allowedOrigins: ["http://localhost:5173"], // optional CORS origins
  persistence: "sqlite",                 // "memory" (default) or "sqlite"
  flushIntervalMs: 30_000,              // periodic flush interval (default 30s)
  ingestSources: [],                     // optional polling ingest sources
});

await server.start();

Gateway Worker (Cloudflare)

The @lakesync/gateway-worker package deploys SyncGateway as a Cloudflare Worker with Durable Objects.

Routes

Same as gateway-server above. All routes except /health require JWT authentication.

Table Sharding

Optional table-level sharding across multiple Durable Objects:

{
  "shards": {
    "orders": "shard-orders",
    "products": "shard-products"
  },
  "default": "shard-default"
}

Set via SHARD_CONFIG environment variable (JSON string). When absent, sharding is disabled.

  • Push — partitions deltas by table and fans out to the correct shard DOs.
  • Pull — fans out to all shards, merges results sorted by HLC.
  • Admin ops — applied to all shards.