LakeSync

Core API

API reference for @lakesync/core — HLC, Delta, Result, conflict resolution, sync rules, connector types, and validation.

The @lakesync/core package provides the foundational types and functions used across the LakeSync ecosystem.

HLC

Hybrid Logical Clock for causal ordering of events.

HLC

class HLC {
  constructor(wallClock?: () => number);
  now(): HLCTimestamp;
  recv(remote: HLCTimestamp): Result<HLCTimestamp, ClockDriftError>;

  static encode(wall: number, counter: number): HLCTimestamp;
  static decode(ts: HLCTimestamp): { wall: number; counter: number };
  static compare(a: HLCTimestamp, b: HLCTimestamp): -1 | 0 | 1;

  static readonly MAX_DRIFT_MS: 5_000;
  static readonly MAX_COUNTER: 0xffff;
}
  • now() — Generate a new monotonically increasing timestamp.
  • recv(remote) — Merge a remote timestamp, advancing the local clock. Returns Err(ClockDriftError) if drift exceeds 5 seconds.
  • encode(wall, counter) — Pack wall clock ms and counter into an HLCTimestamp.
  • decode(ts) — Unpack an HLCTimestamp into its wall and counter components.
  • compare(a, b) — Compare two timestamps. Returns -1, 0, or 1.

HLCTimestamp

A branded bigint encoding 48 bits of wall-clock milliseconds and 16 bits of a monotonic counter.

type HLCTimestamp = bigint & { readonly __brand: "HLCTimestamp" };

Delta

RowDelta

interface RowDelta {
  op: DeltaOp;
  table: string;
  rowId: string;
  clientId: string;
  columns: ColumnDelta[];
  hlc: HLCTimestamp;
  deltaId: string;
}

type DeltaOp = "INSERT" | "UPDATE" | "DELETE";

interface ColumnDelta {
  column: string;
  value: unknown;
}

TableSchema

interface TableSchema {
  table: string;
  sourceTable?: string;
  columns: Array<{ name: string; type: "string" | "number" | "boolean" | "json" | "null" }>;
  primaryKey?: string[];
  softDelete?: boolean;
  externalIdColumn?: string;
}
FieldDefaultDescription
tablerequiredDestination table name
sourceTabletableDelta table name to match against (for renaming destination tables)
columnsrequiredColumn definitions with type hints
primaryKey["row_id"]Composite primary key columns. Each entry must be "row_id" or exist in columns.
softDeletetrueWhen true, tombstoned rows get deleted_at set instead of being hard-deleted
externalIdColumnColumn for upsert conflict resolution (adds a UNIQUE constraint). Must exist in columns.

SyncPush / SyncPull / SyncResponse

interface SyncPush {
  clientId: string;
  deltas: RowDelta[];
  lastSeenHlc: HLCTimestamp;
}

interface SyncPull {
  clientId: string;
  sinceHlc: HLCTimestamp;
  maxDeltas: number;
  source?: string;
}

interface SyncResponse {
  deltas: RowDelta[];
  serverHlc: HLCTimestamp;
  hasMore: boolean;
}

extractDelta(before, after, opts)

function extractDelta(
  before: Record<string, unknown> | null | undefined,
  after: Record<string, unknown> | null | undefined,
  opts: {
    table: string;
    rowId: string;
    clientId: string;
    hlc: HLCTimestamp;
    schema?: TableSchema;
  },
): Promise<RowDelta | null>;

Extract a column-level delta between two row states. Returns null if nothing changed. Generates a deterministic deltaId via SHA-256 of the stable-stringified payload.

  • null before + present after = INSERT
  • present before + null after = DELETE
  • both present = UPDATE (only changed columns)

applyDelta(row, delta)

function applyDelta(
  row: Record<string, unknown> | null,
  delta: RowDelta,
): Record<string, unknown> | null;

Apply a delta to an existing row, returning the merged result. DELETE returns null, INSERT creates a new row, UPDATE merges columns onto the existing row (immutable).

Result

Type-safe error handling without exceptions.

Types

type Result<T, E = LakeSyncError> = { ok: true; value: T } | { ok: false; error: E };

Constructors

function Ok<T>(value: T): Result<T, never>;
function Err<E>(error: E): Result<never, E>;

Utilities

function mapResult<T, U, E>(result: Result<T, E>, fn: (value: T) => U): Result<U, E>;
function flatMapResult<T, U, E>(result: Result<T, E>, fn: (value: T) => Result<U, E>): Result<U, E>;
function unwrapOrThrow<T, E>(result: Result<T, E>): T;
function fromPromise<T>(promise: Promise<T>): Promise<Result<T, Error>>;

Error Classes

class LakeSyncError extends Error {
  readonly code: string;
  override readonly cause?: Error;
  constructor(message: string, code: string, cause?: Error);
}

class ClockDriftError extends LakeSyncError {}   // code: "CLOCK_DRIFT"
class ConflictError extends LakeSyncError {}     // code: "CONFLICT"
class FlushError extends LakeSyncError {}        // code: "FLUSH_FAILED"
class SchemaError extends LakeSyncError {}       // code: "SCHEMA_MISMATCH"
class AdapterError extends LakeSyncError {}      // code: "ADAPTER_ERROR"
class AdapterNotFoundError extends LakeSyncError {} // code: "ADAPTER_NOT_FOUND"
class BackpressureError extends LakeSyncError {} // code: "BACKPRESSURE"

Conflict Resolution

resolveLWW(local, remote)

function resolveLWW(
  local: RowDelta,
  remote: RowDelta,
): Result<RowDelta, ConflictError>;

Column-level Last-Write-Wins merge. For each column present in both deltas, the one with the higher HLC wins. When HLC timestamps are equal, the lexicographically higher clientId wins (deterministic tiebreak).

LWWResolver

class LWWResolver implements ConflictResolver {
  resolve(local: RowDelta, remote: RowDelta): Result<RowDelta, ConflictError>;
}

interface ConflictResolver {
  resolve(local: RowDelta, remote: RowDelta): Result<RowDelta, ConflictError>;
}

Sync Rules

Types

interface SyncRulesConfig {
  version: number;
  buckets: BucketDefinition[];
}

interface BucketDefinition {
  name: string;
  tables: string[];
  filters: SyncRuleFilter[];
}

interface SyncRuleFilter {
  column: string;
  op: SyncRuleOp;
  value: string;
}

type SyncRuleOp = "eq" | "in" | "neq" | "gt" | "lt" | "gte" | "lte";

interface SyncRulesContext {
  claims: ResolvedClaims;
  rules: SyncRulesConfig;
}

type ResolvedClaims = Record<string, string | string[]>;

filterDeltas(deltas, context)

function filterDeltas(
  deltas: RowDelta[],
  context: SyncRulesContext,
): RowDelta[];

Pure function that returns the union of deltas matching any bucket's filters. JWT claim references (prefixed with jwt:) are resolved from the context's claims object.

validateSyncRules(config)

function validateSyncRules(
  config: unknown,
): Result<void, SyncRuleError>;

Validates structural correctness of a sync rules configuration: version, bucket names, filter operators, and uniqueness.

resolveClientBuckets(rules, claims)

function resolveClientBuckets(
  rules: SyncRulesConfig,
  claims: ResolvedClaims,
): string[];

Returns the names of buckets a client matches based on their JWT claims.

Validation

isValidIdentifier(name)

function isValidIdentifier(name: string): boolean;

Check whether a string is a valid SQL identifier (starts with letter or underscore, alphanumeric + underscore, max 64 chars).

assertValidIdentifier(name)

function assertValidIdentifier(name: string): Result<void, SchemaError>;

Returns Ok if valid, Err(SchemaError) if invalid.

quoteIdentifier(name)

function quoteIdentifier(name: string): string;

Double-quote a SQL identifier with proper escaping.

Actions

Types for imperative actions against external systems. See Actions for the full guide.

Action Types

interface Action {
  actionId: string;
  clientId: string;
  hlc: HLCTimestamp;
  connector: string;
  actionType: string;
  params: Record<string, unknown>;
  idempotencyKey?: string;
}

interface ActionPush {
  clientId: string;
  actions: Action[];
}

interface ActionResponse {
  results: Array<ActionResult | ActionErrorResult>;
  serverHlc: HLCTimestamp;
}

interface ActionResult {
  actionId: string;
  data: Record<string, unknown>;
  serverHlc: HLCTimestamp;
}

interface ActionErrorResult {
  actionId: string;
  code: string;
  message: string;
  retryable: boolean;
}

Action Discovery

interface ActionDescriptor {
  actionType: string;
  description: string;
  paramsSchema?: Record<string, unknown>;
}

interface ActionDiscovery {
  connectors: Record<string, ActionDescriptor[]>;
}

ActionHandler

interface ActionHandler {
  readonly supportedActions: ActionDescriptor[];
  executeAction(
    action: Action,
    context?: AuthContext,
  ): Promise<Result<ActionResult, ActionExecutionError | ActionNotSupportedError>>;
}

function isActionHandler(obj: unknown): obj is ActionHandler;

Action Errors

class ActionValidationError extends LakeSyncError {}   // code: "ACTION_VALIDATION_ERROR"
class ActionExecutionError extends LakeSyncError {      // code: "ACTION_EXECUTION_ERROR"
  readonly retryable: boolean;
}
class ActionNotSupportedError extends LakeSyncError {}  // code: "ACTION_NOT_SUPPORTED"

Utilities

function validateAction(action: Action): Result<void, ActionValidationError>;
function generateActionId(action: Omit<Action, "actionId">): Promise<string>;
function isActionError(result: ActionResult | ActionErrorResult): result is ActionErrorResult;

Connector Types

Types for dynamic connector registration. See Dynamic Connectors for the full API.

type ConnectorType = "postgres" | "mysql" | "bigquery" | "jira" | "salesforce";

interface ConnectorConfigBase {
  type: string;
  name: string;
  ingest?: ConnectorIngestConfig;
}

type ConnectorConfig =
  | PostgresConnectorConfigFull
  | MySQLConnectorConfigFull
  | BigQueryConnectorConfigFull
  | JiraConnectorConfigFull
  | SalesforceConnectorConfigFull
  | (ConnectorConfigBase & Record<string, unknown>);

function validateConnectorConfig(
  input: unknown,
): Result<ConnectorConfig, ConnectorValidationError>;

The open catch-all variant allows custom connector types to pass through without modifying core. Existing switch statements still work for known types.

Adapter Interfaces

Core adapter types and type guards. These interfaces are defined in @lakesync/core and re-exported by @lakesync/adapter for backward compatibility.

LakeAdapter

interface LakeAdapter {
  putObject(path: string, data: Uint8Array, contentType?: string): Promise<Result<void, AdapterError>>;
  getObject(path: string): Promise<Result<Uint8Array, AdapterError>>;
  headObject(path: string): Promise<Result<{ size: number; lastModified: Date }, AdapterError>>;
  listObjects(prefix: string): Promise<Result<ObjectInfo[], AdapterError>>;
  deleteObject(path: string): Promise<Result<void, AdapterError>>;
  deleteObjects(paths: string[]): Promise<Result<void, AdapterError>>;
}

interface ObjectInfo {
  key: string;
  size: number;
  lastModified: Date;
}

DatabaseAdapter

interface DatabaseAdapter {
  insertDeltas(deltas: RowDelta[]): Promise<Result<void, AdapterError>>;
  queryDeltasSince(hlc: HLCTimestamp, tables?: string[]): Promise<Result<RowDelta[], AdapterError>>;
  getLatestState(table: string, rowId: string): Promise<Result<Record<string, unknown> | null, AdapterError>>;
  ensureSchema(schema: TableSchema): Promise<Result<void, AdapterError>>;
  close(): Promise<void>;
}

function isDatabaseAdapter(adapter: unknown): adapter is DatabaseAdapter;

Materialisable

interface Materialisable {
  materialise(deltas: RowDelta[], schemas: ReadonlyArray<TableSchema>): Promise<Result<void, AdapterError>>;
}

function isMaterialisable(adapter: unknown): adapter is Materialisable;

Opt-in capability for adapters that can materialise deltas into queryable destination tables. All three database adapters (PostgresAdapter, MySQLAdapter, BigQueryAdapter) implement this interface via the generic SqlDialect + executeMaterialise() pattern. See Materialise Protocol for details.

Source Polling

Base class and types for building source pollers. See Source Polling Ingest for usage.

abstract class BaseSourcePoller {
  constructor(config: {
    name: string;
    intervalMs: number;
    gateway: PushTarget;
    memory?: PollerMemoryConfig;
  });

  start(): void;
  stop(): void;
  get isRunning(): boolean;

  abstract poll(): Promise<void>;

  protected pushDeltas(deltas: RowDelta[]): void;
  protected accumulateDelta(delta: RowDelta): Promise<void>;
  protected flushAccumulator(): Promise<void>;
}

interface PushTarget {
  handlePush(push: SyncPush): unknown;
}

interface IngestTarget extends PushTarget {
  flush(): Promise<Result<void, FlushError>>;
  shouldFlush(): boolean;
  readonly bufferStats: { logSize: number; indexSize: number; byteSize: number };
}

function isIngestTarget(target: PushTarget): target is IngestTarget;

interface PollerMemoryConfig {
  chunkSize?: number;
  memoryBudgetBytes?: number;
  flushThreshold?: number;
}

Poller Factories

Registry-based creation of source pollers by connector type.

interface PollerRegistry {
  get(type: string): PollerFactory | undefined;
  with(type: string, factory: PollerFactory): PollerRegistry;
}

function createPollerRegistry(factories?: Map<string, PollerFactory>): PollerRegistry;

function createPoller(config: ConnectorConfig, gateway: PushTarget, registry: PollerRegistry): BaseSourcePoller;

type PollerFactory = (config: ConnectorConfig, gateway: PushTarget) => BaseSourcePoller;

Build a PollerRegistry with the factories you need. Connector packages export their factories as named exports (e.g. jiraPollerFactory from @lakesync/connector-jira).

CallbackPushTarget

A simple PushTarget that forwards pushes to a callback. Useful for testing.

class CallbackPushTarget implements PushTarget {
  constructor(fn: (push: SyncPush) => unknown);
}

Auth

function verifyToken(
  token: string,
  secret: string,
): Promise<Result<AuthClaims, AuthError>>;

interface AuthClaims {
  clientId: string;
  gatewayId: string;
  role: string;
  customClaims: Record<string, string | string[]>;
}

class AuthError extends Error {
  constructor(message: string);
}

Verifies a JWT signed with HMAC-SHA256 using the Web Crypto API. Checks expiry, required claims (sub, gw, exp), and extracts custom claims for sync rules evaluation. The role claim defaults to "client" if absent.