Gateway
API reference for @lakesync/gateway — SyncGateway, configuration, push/pull, flush, sources, actions, and monitoring.
The @lakesync/gateway package provides SyncGateway — the server-side engine that coordinates delta ingestion, conflict resolution, buffering, flush, source adapters, and action dispatch.
SyncGateway
Constructor
class SyncGateway {
constructor(config: GatewayConfig, adapter?: LakeAdapter | DatabaseAdapter);
}The adapter parameter is a convenience shorthand for config.adapter.
Note:
LakeAdapterandDatabaseAdapterare imported from@lakesync/core(not@lakesync/adapter). The adapter package provides concrete implementations (S3Adapter, PostgresAdapter, etc.), while the interfaces live in core.
GatewayConfig
interface GatewayConfig {
gatewayId: string;
maxBufferBytes: number;
maxBufferAgeMs: number;
flushFormat?: "json" | "parquet";
tableSchema?: TableSchema;
catalogue?: NessieCatalogueClient;
schemaManager?: SchemaManager;
adapter?: LakeAdapter | DatabaseAdapter;
sourceAdapters?: Record<string, DatabaseAdapter>;
adaptiveBufferConfig?: {
wideColumnThreshold: number;
reductionFactor: number;
};
maxBackpressureBytes?: number;
perTableBudgetBytes?: number;
actionHandlers?: Record<string, ActionHandler>;
schemas?: ReadonlyArray<TableSchema>;
}| Field | Default | Description |
|---|---|---|
gatewayId | required | Unique identifier for this gateway instance |
maxBufferBytes | required | Buffer byte threshold for flush |
maxBufferAgeMs | required | Buffer age threshold for flush (ms) |
flushFormat | "parquet" | Output format for flush ("json" or "parquet") |
tableSchema | — | Required for Parquet flush |
adapter | — | Storage adapter for flush (S3/R2, Postgres, MySQL, BigQuery) |
sourceAdapters | {} | Named database adapters for adapter-sourced pull |
maxBackpressureBytes | 2 × maxBufferBytes | Reject pushes above this threshold |
perTableBudgetBytes | — | Per-table budget for auto-flush |
actionHandlers | — | Named action handlers for imperative actions |
schemas | — | Table schemas for materialisation after flush (see Materialise Protocol) |
Example:
import { SyncGateway } from "@lakesync/gateway";
const gateway = new SyncGateway({
gatewayId: "my-gw",
maxBufferBytes: 4 * 1024 * 1024,
maxBufferAgeMs: 30_000,
adapter: s3Adapter,
sourceAdapters: { postgres: pgAdapter },
actionHandlers: { slack: slackHandler },
});Push / Pull
handlePush(msg)
handlePush(
msg: SyncPush,
): Result<HandlePushResult, ClockDriftError | SchemaError | BackpressureError>;Validates HLC drift, resolves conflicts via column-level LWW, and appends deltas to the buffer.
- Idempotent — re-pushing the same
deltaIdcounts as accepted but is not re-ingested. - Backpressure — returns
BackpressureErrorwhen buffer exceedsmaxBackpressureBytes. - Schema validation — validates against
SchemaManagerif configured.
handlePull(msg, context?)
// Buffer pull (synchronous)
handlePull(msg: SyncPull, context?: SyncRulesContext): Result<SyncResponse, never>;
// Adapter-sourced pull (async)
handlePull(
msg: SyncPull & { source: string },
context?: SyncRulesContext,
): Promise<Result<SyncResponse, AdapterNotFoundError | AdapterError>>;Two paths:
-
Buffer pull — returns deltas from the in-memory log since
sinceHlc. When aSyncRulesContextis provided, deltas are post-filtered by the client's bucket definitions and JWT claims. Over-fetches 3× the limit and retries up to 5 times to fill the page after filtering. -
Adapter-sourced pull — when
msg.sourceis set, queries the namedDatabaseAdapterviaqueryDeltasSince()instead of the buffer. Sync rules filtering applies if context is provided.
Flush
flush()
flush(): Promise<Result<void, FlushError>>;Drain the entire buffer and write to the configured adapter.
- Parquet (default) — writes a Parquet file via
writeDeltasToParquet. - JSON — writes a
FlushEnvelopeas JSON. - Database adapter — calls
insertDeltas()directly. - On failure, entries are restored to the buffer for retry.
- Returns
FlushErrorif no adapter is configured or if a flush is already in progress.
flushTable(table)
flushTable(table: string): Promise<Result<void, FlushError>>;Flush a single table's deltas, leaving other tables in the buffer. Same write logic as flush().
shouldFlush()
shouldFlush(): boolean;Check if the buffer should be flushed based on maxBufferBytes and maxBufferAgeMs thresholds. Respects adaptiveBufferConfig for wide-column deltas.
getTablesExceedingBudget()
getTablesExceedingBudget(): string[];Returns table names whose buffer size exceeds perTableBudgetBytes. Returns empty array when no budget is configured.
rehydrate(deltas)
rehydrate(deltas: ReadonlyArray<RowDelta>): void;Restore persisted deltas into the buffer without push validation (no HLC drift check, no conflict resolution, no schema validation). Used by GatewayServer to rehydrate from SQLite persistence on startup.
Source Adapters
Named database adapters for adapter-sourced pulls. Clients pull from a source by passing source: "name" in the pull request.
registerSource(name, adapter)
registerSource(name: string, adapter: DatabaseAdapter): void;unregisterSource(name)
unregisterSource(name: string): void;listSources()
listSources(): string[];Example:
import { PostgresAdapter } from "@lakesync/adapter";
const pg = new PostgresAdapter({ connectionString: "..." });
gateway.registerSource("analytics", pg);
// Client can now pull from this source
coordinator.pullFrom("analytics");Actions
Execute imperative operations against external systems via registered ActionHandlers. See Actions for the full guide.
handleAction(msg, context?)
handleAction(
msg: ActionPush,
context?: AuthContext,
): Promise<Result<ActionResponse, ActionValidationError>>;Dispatches each action to the matching handler by connector name. Supports:
actionIddeduplication — re-submitting the same ID returns the cached result.idempotencyKeydeduplication — different actions with the same key return the first result.- Retryable errors are not cached — the client can retry with the same
actionId. - Auth context is forwarded to
executeAction()for fine-grained permissions.
registerActionHandler(name, handler)
registerActionHandler(name: string, handler: ActionHandler): void;unregisterActionHandler(name)
unregisterActionHandler(name: string): void;listActionHandlers()
listActionHandlers(): string[];describeActions()
describeActions(): ActionDiscovery;Returns a map of connector name → supported ActionDescriptor[]. Used by the discovery endpoint (GET /sync/:id/actions) and client's describeActions().
Example:
gateway.registerActionHandler("github", {
supportedActions: [
{ actionType: "create_pr", description: "Create a pull request" },
],
async executeAction(action) {
// Call GitHub API...
return Ok({ actionId: action.actionId, data: { url: "..." }, serverHlc: 0n as HLCTimestamp });
},
});
gateway.describeActions();
// { connectors: { github: [{ actionType: "create_pr", description: "..." }] } }Monitoring
bufferStats
get bufferStats(): { logSize: number; indexSize: number; byteSize: number };logSize— total number of deltas in the buffer.indexSize— number of unique row keys tracked.byteSize— estimated byte size of buffered deltas.
tableStats
get tableStats(): Array<{ table: string; byteSize: number; deltaCount: number }>;Per-table breakdown of buffer contents.
Types
HandlePushResult
interface HandlePushResult {
serverHlc: HLCTimestamp;
accepted: number;
deltas: RowDelta[];
}FlushEnvelope
interface FlushEnvelope {
version: 1;
gatewayId: string;
createdAt: string;
hlcRange: { min: HLCTimestamp; max: HLCTimestamp };
deltaCount: number;
byteSize: number;
deltas: RowDelta[];
}GatewayState
interface GatewayState {
hlc: HLCTimestamp;
flushing: boolean;
}Gateway Server
The @lakesync/gateway-server package wraps SyncGateway in a standalone HTTP server for self-hosted deployments. See Architecture for deployment options.
Routes
| Method | Path | Auth | Description |
|---|---|---|---|
POST | /sync/:id/push | JWT | Push deltas |
POST | /sync/:id/pull | JWT | Pull deltas |
POST | /sync/:id/action | JWT | Execute actions |
GET | /sync/:id/actions | JWT | Discover available actions |
GET | /sync/:id/checkpoint | JWT | Download checkpoint |
POST | /admin/flush/:id | JWT | Trigger flush |
POST | /admin/schema/:id | JWT | Update schema |
POST | /admin/sync-rules/:id | JWT | Update sync rules |
POST | /admin/connectors/:id | JWT (admin) | Register a connector |
GET | /admin/connectors/:id | JWT (admin) | List connectors |
DELETE | /admin/connectors/:id/:name | JWT (admin) | Unregister a connector |
GET | /sync/:id/metrics | JWT | Buffer statistics |
GET | /health | None | Health check |
Configuration
import { GatewayServer } from "@lakesync/gateway-server";
const server = new GatewayServer({
gatewayId: "my-gw",
port: 8080,
adapter: s3Adapter, // optional storage adapter
maxBufferBytes: 4 * 1024 * 1024, // default 4 MiB
maxBufferAgeMs: 30_000, // default 30s
jwtSecret: process.env.JWT_SECRET, // optional — disables auth when absent
allowedOrigins: ["http://localhost:5173"], // optional CORS origins
persistence: "sqlite", // "memory" (default) or "sqlite"
flushIntervalMs: 30_000, // periodic flush interval (default 30s)
ingestSources: [], // optional polling ingest sources
});
await server.start();Gateway Worker (Cloudflare)
The @lakesync/gateway-worker package deploys SyncGateway as a Cloudflare Worker with Durable Objects.
Routes
Same as gateway-server above. All routes except /health require JWT authentication.
Table Sharding
Optional table-level sharding across multiple Durable Objects:
{
"shards": {
"orders": "shard-orders",
"products": "shard-products"
},
"default": "shard-default"
}Set via SHARD_CONFIG environment variable (JSON string). When absent, sharding is disabled.
- Push — partitions deltas by table and fans out to the correct shard DOs.
- Pull — fans out to all shards, merges results sorted by HLC.
- Admin ops — applied to all shards.